This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c1503b   [tiered storage] Use NAR plugin to package offloaders  
(#2393)
9c1503b is described below

commit 9c1503b67eb122d2801fd9a9b5a94e5f7780efc0
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Wed Aug 29 00:52:29 2018 -0700

     [tiered storage] Use NAR plugin to package offloaders  (#2393)
    
    
     ### Motivation
    
    Offloader typically involves a new storage system, which usually involves 
dependencies that
    might be conflicting with the dependencies of Pulsar. We want to package 
offloader implementations
    as what we did for connectors, so people can decide which offloader to use 
and only include it.
    People would also become easier to write its offloader if needed.
    
     ### Changes
    
    - Update `tiered-storage/jcloud` to package it using nifi-nar plugin.
    - Add bunch of utils in managed-ledger to locate offloader nar packages and 
load `LedgerOffloaderFactory` from corresponding NAR package.
    - Add a new distribution module to package offloaders into one 
distribution, as what we did for connectors.
    - Update pulsar-all image to include offloaders
    
     ### NOTES
    
    This change doesn't get rid of jcloud-shaded. because jcloud is using 
`ServiceLoad` and Guice injection. It makes
    things very tricky on class loading. Not attempt to address the problem any 
time soon. We can pin `jcloud-shaded` version
    as what we did for `protobuf-shaded` in next release.
---
 conf/broker.conf                                   |   3 +
 distribution/{ => offloaders}/pom.xml              |  42 +++++-
 distribution/offloaders/src/assemble/README        |  10 ++
 .../offloaders/src/assemble/offloaders.xml         |  54 ++++++++
 distribution/pom.xml                               |   1 +
 distribution/server/src/assemble/LICENSE.bin.txt   |  14 --
 docker/pulsar-all/Dockerfile                       |   4 +
 docker/pulsar-all/pom.xml                          |  21 +++
 .../mledger/offload/OffloaderDefinition.java       |  46 +++++++
 .../bookkeeper/mledger/offload/OffloaderUtils.java | 149 +++++++++++++++++++++
 .../bookkeeper/mledger/offload/Offloaders.java     |  57 ++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 ++
 pulsar-broker/pom.xml                              |   6 -
 .../org/apache/pulsar/broker/PulsarService.java    |  20 ++-
 tiered-storage/jcloud/pom.xml                      |  10 +-
 .../impl/BlobStoreManagedLedgerOffloader.java      |   6 +
 .../META-INF/services/pulsar-offloader.yaml        |   9 +-
 17 files changed, 422 insertions(+), 41 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 11670d3..8927a85 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -513,6 +513,9 @@ 
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe
 
 ### --- Ledger Offloading --- ###
 
+# The directory for all the offloader implementations
+offloadersDirectory=./offloaders
+
 # Driver to use to offload old data to long term storage (Possible values: S3, 
aws-s3, google-cloud-storage)
 # When using google-cloud-storage, Make sure both Google Cloud Storage and 
Google Cloud Storage JSON API are enabled for
 # the project (check from Developers Console -> Api&auth -> APIs).
diff --git a/distribution/pom.xml b/distribution/offloaders/pom.xml
similarity index 53%
copy from distribution/pom.xml
copy to distribution/offloaders/pom.xml
index 36b4917..84349da 100644
--- a/distribution/pom.xml
+++ b/distribution/offloaders/pom.xml
@@ -24,17 +24,45 @@
 
   <parent>
     <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
+    <artifactId>distribution</artifactId>
     <version>2.2.0-incubating-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>distribution</artifactId>
+  <artifactId>pulsar-offloader-distribution</artifactId>
   <packaging>pom</packaging>
-  <name>Pulsar :: Distribution</name>
+  <name>Pulsar :: Distribution :: Offloader</name>
 
-  <modules>
-    <module>server</module>
-    <module>io</module>
-  </modules>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>distro-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <attach>true</attach>
+              <tarLongFileMode>posix</tarLongFileMode>
+              
<finalName>apache-pulsar-offloaders-${project.version}</finalName>
+              <descriptors>
+                <descriptor>src/assemble/offloaders.xml</descriptor>
+              </descriptors>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/distribution/offloaders/src/assemble/README 
b/distribution/offloaders/src/assemble/README
new file mode 100644
index 0000000..6777ec2
--- /dev/null
+++ b/distribution/offloaders/src/assemble/README
@@ -0,0 +1,10 @@
+
+Please refer to http://pulsar.incubator.apache.org/ for access to 
documentation.
+
+This package contains Pulsar offloader archives. Each archive
+contains:
+
+ * the offloader code plus all the dependencies
+
+ * META-INF/DEPEDENCIES file with licensing information for all transitive
+   dependencies
diff --git a/distribution/offloaders/src/assemble/offloaders.xml 
b/distribution/offloaders/src/assemble/offloaders.xml
new file mode 100644
index 0000000..e9babe4
--- /dev/null
+++ b/distribution/offloaders/src/assemble/offloaders.xml
@@ -0,0 +1,54 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+  <id>bin</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>true</includeBaseDirectory>
+  <files>
+    <file>
+      <source>${basedir}/../../DISCLAIMER</source>
+      <outputDirectory>.</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
+    <file>
+      <source>${basedir}/../../LICENSE</source>
+      <outputDirectory>.</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
+    <file>
+      <source>${basedir}/src/assemble/README</source>
+      <destName>README</destName>
+      <outputDirectory>.</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
+
+    <file>
+      
<source>${basedir}/../../tiered-storage/jcloud/target/tiered-storage-jcloud-${project.version}.nar</source>
+      <outputDirectory>offloaders</outputDirectory>
+      <fileMode>644</fileMode>
+    </file>
+  </files>
+</assembly>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 36b4917..80134e2 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -36,5 +36,6 @@
   <modules>
     <module>server</module>
     <module>io</module>
+    <module>offloaders</module>
   </modules>
 </project>
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 8da89a8..e0b1eb7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -317,7 +317,6 @@ The Apache Software License, Version 2.0
      - com.fasterxml.jackson.core-jackson-annotations-2.8.4.jar
      - com.fasterxml.jackson.core-jackson-core-2.8.4.jar
      - com.fasterxml.jackson.core-jackson-databind-2.8.4.jar
-     - com.fasterxml.jackson.dataformat-jackson-dataformat-cbor-2.6.7.jar
      - com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.8.4.jar
      - com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.8.4.jar
      - com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.8.4.jar
@@ -359,7 +358,6 @@ The Apache Software License, Version 2.0
     - io.prometheus-simpleclient_hotspot-0.0.23.jar
     - io.prometheus-simpleclient_servlet-0.0.23.jar
  * Bean Validation API -- javax.validation-validation-api-1.1.0.Final.jar
- * Joda Time -- joda-time-joda-time-2.8.1.jar
  * Log4J
     - log4j-log4j-1.2.17.jar
     - org.apache.logging.log4j-log4j-api-2.10.0.jar
@@ -419,9 +417,6 @@ The Apache Software License, Version 2.0
  * OkHttp - com.squareup.okhttp-okhttp-2.5.0.jar
  * Okio - com.squareup.okio-okio-1.13.0.jar
  * Javassist -- org.javassist-javassist-3.21.0-GA.jar
- * Amazon AWS SDK
-    - com.amazonaws-aws-java-sdk-core-1.11.297.jar
-    - software.amazon.ion-ion-java-1.0.2.jar
   * gRPC
     - io.grpc-grpc-all-1.12.0.jar
     - io.grpc-grpc-auth-1.12.0.jar
@@ -455,10 +450,6 @@ The Apache Software License, Version 2.0
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Objenesis
     - org.objenesis-objenesis-2.1.jar
-  * Java Dependency Injection
-    - javax.inject-javax.inject-1.jar
-  * java-xmlbuilder
-    - com.jamesmurty.utils-java-xmlbuilder-1.1.jar
 
 
 BSD 3-clause "New" or "Revised" License
@@ -492,7 +483,6 @@ Protocol Buffers License
 CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt
  * Java Annotations API
     - javax.annotation-javax.annotation-api-1.2.jar
-    - javax.annotation-jsr250-api-1.0.jar
  * Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar
  * WebSocket Server API -- javax.websocket-javax.websocket-api-1.0.jar
  * Java Web Service REST API -- javax.ws.rs-javax.ws.rs-api-2.1.jar
@@ -524,10 +514,6 @@ Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt
 Public Domain
  * XZ for Java -- licenses/LICENSE-xz.txt
     - org.tukaani-xz-1.5.jar
- * iHarder.net Base64
-    - net.iharder-base64-2.3.8.jar
- * AOP Alliance
-    - aopalliance-aopalliance-1.0.jar
 
 Public Domain (CC0) -- licenses/LICENSE-CC0.txt
  * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.0.jar
diff --git a/docker/pulsar-all/Dockerfile b/docker/pulsar-all/Dockerfile
index 46f7b9f..717a007 100644
--- a/docker/pulsar-all/Dockerfile
+++ b/docker/pulsar-all/Dockerfile
@@ -20,6 +20,10 @@
 FROM apachepulsar/pulsar:latest
 
 ARG PULSAR_IO_TARBALL
+ARG PULSAR_OFFLOADER_TARBALL
 
 ADD ${PULSAR_IO_TARBALL} /
 RUN mv /apache-pulsar-io-connectors-*/connectors /pulsar/connectors
+
+ADD ${PULSAR_OFFLOADER_TARBALL} /
+RUN mv /apache-pulsar-offloaders-*/offloaders /pulsar/offloaders
diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml
index 5149999..519a498 100644
--- a/docker/pulsar-all/pom.xml
+++ b/docker/pulsar-all/pom.xml
@@ -40,6 +40,14 @@
       <type>tar.gz</type>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-offloader-distribution</artifactId>
+      <version>${project.parent.version}</version>
+      <classifier>bin</classifier>
+      <type>tar.gz</type>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
@@ -72,6 +80,18 @@
                   <excludeTransitive>true</excludeTransitive>
                 </configuration>
               </execution>
+              <execution>
+                <id>copy-offloader-tarball</id>
+                <goals>
+                  <goal>copy-dependencies</goal>
+                </goals>
+                <phase>generate-resources</phase>
+                <configuration>
+                  
<outputDirectory>${project.build.directory}/</outputDirectory>
+                  
<includeArtifactIds>pulsar-offloader-distribution</includeArtifactIds>
+                  <excludeTransitive>true</excludeTransitive>
+                </configuration>
+              </execution>
             </executions>
           </plugin>
           <plugin>
@@ -103,6 +123,7 @@
               <tag>${project.version}</tag>
               <buildArgs>
                 
<PULSAR_IO_TARBALL>target/pulsar-io-distribution-${project.version}-bin.tar.gz</PULSAR_IO_TARBALL>
+                
<PULSAR_OFFLOADER_TARBALL>target/pulsar-offloader-distribution-${project.version}-bin.tar.gz</PULSAR_OFFLOADER_TARBALL>
               </buildArgs>
             </configuration>
           </plugin>
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java
new file mode 100644
index 0000000..414b7c8
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Definition of an offloader NAR package.
+ */
+@Data
+@NoArgsConstructor
+public class OffloaderDefinition {
+
+    /**
+     * The name of the offloader type.
+     */
+    private String name;
+
+    /**
+     * Description to be used for user help.
+     */
+    private String description;
+
+    /**
+     * The class name for the offloader factory implementation.
+     */
+    private String offloaderFactoryClass;
+
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
new file mode 100644
index 0000000..a1d3349
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+/**
+ * Utils to load offloaders
+ */
+@Slf4j
+public class OffloaderUtils {
+
+    private static final String PULSAR_OFFLOADER_SERVICE_NAME = 
"pulsar-offloader.yaml";
+
+    /**
+     * Extract the Pulsar offloader class from a offloader archive.
+     *
+     * @param narPath nar package path
+     * @return the offloader class name
+     * @throws IOException when fail to retrieve the pulsar offloader class
+     */
+    static Pair<NarClassLoader, LedgerOffloaderFactory> 
getOffloaderFactory(String narPath) throws IOException {
+        NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), 
Collections.emptySet());
+        String configStr = 
ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
+
+        OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml()
+            .readValue(configStr, OffloaderDefinition.class);
+        if (StringUtils.isEmpty(conf.getOffloaderFactoryClass())) {
+            throw new IOException(
+                String.format("The '%s' offloader does not provide an 
offloader factory implementation",
+                    conf.getName()));
+        }
+
+        try {
+            // Try to load offloader factory class and check it implements 
Offloader interface
+            Class factoryClass = 
ncl.loadClass(conf.getOffloaderFactoryClass());
+            CompletableFuture<LedgerOffloaderFactory> loadFuture = new 
CompletableFuture<>();
+            Thread loadingThread = new Thread(() -> {
+                Thread.currentThread().setContextClassLoader(ncl);
+
+                log.info("Loading offloader factory {} using class loader {}", 
factoryClass, ncl);
+                try {
+                    Object offloader = factoryClass.newInstance();
+                    if (!(offloader instanceof LedgerOffloaderFactory)) {
+                        throw new IOException("Class " + 
conf.getOffloaderFactoryClass() + " does not implement interface "
+                            + LedgerOffloaderFactory.class.getName());
+                    }
+                    loadFuture.complete((LedgerOffloaderFactory) offloader);
+                } catch (Throwable t) {
+                    loadFuture.completeExceptionally(t);
+                }
+            }, "load-factory-" + factoryClass);
+            try {
+                loadingThread.start();
+                return Pair.of(ncl, loadFuture.get());
+            } finally {
+                loadingThread.join();
+            }
+        } catch (Throwable t) {
+            rethrowIOException(t);
+        }
+        return null;
+    }
+
+    private static void rethrowIOException(Throwable cause)
+            throws IOException {
+        if (cause instanceof IOException) {
+            throw (IOException) cause;
+        } else if (cause instanceof RuntimeException) {
+            throw (RuntimeException) cause;
+        } else if (cause instanceof Error) {
+            throw (Error) cause;
+        } else {
+            throw new IOException(cause.getMessage(), cause);
+        }
+    }
+
+    public static OffloaderDefinition getOffloaderDefinition(String narPath) 
throws IOException {
+        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new 
File(narPath), Collections.emptySet())) {
+            String configStr = 
ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME);
+
+            return 
ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, 
OffloaderDefinition.class);
+        }
+    }
+
+    public static Offloaders searchForOffloaders(String connectorsDirectory) 
throws IOException {
+        Path path = Paths.get(connectorsDirectory).toAbsolutePath();
+        log.info("Searching for connectors in {}", path);
+
+        Offloaders offloaders = new Offloaders();
+
+        if (!path.toFile().exists()) {
+            log.warn("Offloaders archive directory not found");
+            return offloaders;
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, 
"*.nar")) {
+            stream.forEach(archive -> {
+                try {
+                    OffloaderDefinition definition = 
getOffloaderDefinition(archive.toString());
+                    log.info("Found offloader {} from {}", definition, 
archive);
+
+                    if 
(!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) {
+                        // Validate offloader factory class to be present and 
of the right type
+                        Pair<NarClassLoader,  LedgerOffloaderFactory> 
offloaderFactoryPair =
+                            getOffloaderFactory(archive.toString());
+                        if (null != offloaderFactoryPair) {
+                            
offloaders.getOffloaders().add(offloaderFactoryPair);
+                        }
+                    }
+                } catch (Throwable t) {
+                    log.warn("Failed to load offloader from {}", archive, t);
+                }
+            });
+        }
+        return offloaders;
+    }
+
+
+}
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
new file mode 100644
index 0000000..dfe4c31
--- /dev/null
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.nar.NarClassLoader;
+
+@Slf4j
+@Data
+public class Offloaders implements AutoCloseable {
+
+    private final List<Pair<NarClassLoader, LedgerOffloaderFactory>> 
offloaders = new ArrayList<>();
+
+    public LedgerOffloaderFactory getOffloaderFactory(String driverName) 
throws IOException {
+        for (Pair<NarClassLoader, LedgerOffloaderFactory> factory : 
offloaders) {
+            if (factory.getRight().isDriverSupported(driverName)) {
+                return factory.getRight();
+            }
+        }
+        throw new IOException("No offloader found for driver '" + driverName + 
"'." +
+            " Please make sure you dropped the offloader nar packages under 
`${PULSAR_HOME}/offloaders`.");
+    }
+
+    @Override
+    public void close() throws Exception {
+        offloaders.forEach(offloader -> {
+            try {
+                offloader.getLeft().close();
+            } catch (IOException e) {
+                log.warn("Failed to close nar class loader for offloader '{}': 
{}",
+                    offloader.getRight().getClass(), e.getMessage());
+            }
+        });
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e94f6b9..65696f7 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -499,6 +499,9 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
      * NOTES: all implementation related settings should be put in 
implementation package.
      *        only common settings like driver name, io threads can be added 
here.
      ****/
+    // The directory to locate offloaders
+    private String offloadersDirectory = "./offloaders";
+
     // Driver to use to offload old data to long term storage
     private String managedLedgerOffloadDriver = null;
 
@@ -1728,6 +1731,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         return this.managedLedgerOffloadMaxThreads;
     }
 
+    public String getOffloadersDirectory() {
+        return offloadersDirectory;
+    }
+
+    public void setOffloadersDirectory(String dir) {
+        this.offloadersDirectory = dir;
+    }
+
     public void setBrokerServiceCompactionMonitorIntervalInSeconds(int 
interval) {
         this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
     }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index f3410b5..fe575a7 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -99,12 +99,6 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>tiered-storage-jcloud</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-broker-common</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d965c7e..ca5ac5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
@@ -50,8 +51,10 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import 
org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory;
+import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
+import org.apache.bookkeeper.mledger.offload.Offloaders;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -140,6 +143,7 @@ public class PulsarService implements AutoCloseable {
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
     private OrderedScheduler offloaderScheduler;
+    private Offloaders offloaderManager = new Offloaders();
     private LedgerOffloader offloader;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
@@ -286,6 +290,8 @@ public class PulsarService implements AutoCloseable {
                 schemaRegistryService.close();
             }
 
+            offloaderManager.close();
+
             state = State.Closed;
 
         } catch (Exception e) {
@@ -664,11 +670,13 @@ public class PulsarService implements AutoCloseable {
     public synchronized LedgerOffloader 
createManagedLedgerOffloader(ServiceConfiguration conf)
             throws PulsarServerException {
         try {
-            // TODO: will make this configurable when switching to use NAR 
loader to load offloaders
-            LedgerOffloaderFactory offloaderFactory = 
JCloudLedgerOffloaderFactory.of();
-
-            if (conf.getManagedLedgerOffloadDriver() != null
-                && 
offloaderFactory.isDriverSupported(conf.getManagedLedgerOffloadDriver())) {
+            if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) {
+                checkNotNull(conf.getOffloadersDirectory(),
+                    "Offloader driver is configured to be '%s' but no 
offloaders directory is configured.",
+                    conf.getManagedLedgerOffloadDriver());
+                this.offloaderManager = 
OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory());
+                LedgerOffloaderFactory offloaderFactory = 
this.offloaderManager.getOffloaderFactory(
+                    conf.getManagedLedgerOffloadDriver());
                 try {
                     return offloaderFactory.create(
                         conf.getProperties(),
diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml
index fcd3300..a7d516e 100644
--- a/tiered-storage/jcloud/pom.xml
+++ b/tiered-storage/jcloud/pom.xml
@@ -42,7 +42,6 @@
       <groupId>org.apache.pulsar</groupId>
       <artifactId>jclouds-shaded</artifactId>
       <version>${project.version}</version>
-        <!--
       <exclusions>
         <exclusion>
           <groupId>com.google.code.gson</groupId>
@@ -69,7 +68,6 @@
           <artifactId>*</artifactId>
         </exclusion>
       </exclusions>
-      -->
     </dependency>
     <dependency>
       <groupId>com.amazonaws</groupId>
@@ -129,4 +127,12 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
index f96afaf..b8c2727 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
@@ -49,6 +49,7 @@ import 
org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
 import org.apache.commons.lang3.tuple.Pair;
 import org.jclouds.Constants;
 import org.jclouds.ContextBuilder;
+import org.jclouds.aws.s3.AWSS3ProviderMetadata;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;
@@ -61,8 +62,10 @@ import org.jclouds.domain.Location;
 import org.jclouds.domain.LocationBuilder;
 import org.jclouds.domain.LocationScope;
 import org.jclouds.googlecloud.GoogleCredentialsFromJson;
+import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata;
 import org.jclouds.io.Payload;
 import org.jclouds.io.Payloads;
+import org.jclouds.osgi.ProviderRegistry;
 import org.jclouds.s3.reference.S3Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,6 +120,9 @@ public class BlobStoreManagedLedgerOffloader implements 
LedgerOffloader {
         overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000");
         overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, 
Integer.toString(100));
 
+        ProviderRegistry.registerProvider(new AWSS3ProviderMetadata());
+        ProviderRegistry.registerProvider(new 
GoogleCloudStorageProviderMetadata());
+
         ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver);
         contextBuilder.credentials(credentials.identity, 
credentials.credential);
 
diff --git a/docker/pulsar-all/Dockerfile 
b/tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml
similarity index 83%
copy from docker/pulsar-all/Dockerfile
copy to 
tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml
index 46f7b9f..8f97287 100644
--- a/docker/pulsar-all/Dockerfile
+++ 
b/tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml
@@ -17,9 +17,6 @@
 # under the License.
 #
 
-FROM apachepulsar/pulsar:latest
-
-ARG PULSAR_IO_TARBALL
-
-ADD ${PULSAR_IO_TARBALL} /
-RUN mv /apache-pulsar-io-connectors-*/connectors /pulsar/connectors
+name: jcloud
+description: JCloud based offloader implementation
+offloaderFactoryClass: 
org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory

Reply via email to