This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e01ccc2  Initial code dump for dynamically loading pinot plugins 
(#4899)
e01ccc2 is described below

commit e01ccc2b98a342dce7f3bd0d547cf7c3c66035e7
Author: Kishore Gopalakrishna <[email protected]>
AuthorDate: Sat Dec 7 23:05:01 2019 -0800

    Initial code dump for dynamically loading pinot plugins (#4899)
    
    * Initial code dump for dynamically loading pinot plugins
    
    * Adding license header
    
    * Adding more methods to PluginManager
    
    * Addressing comments, added a test case
---
 pinot-record-readers/pom.xml                       |   1 +
 .../java/org/apache/pinot/spi/plugin/Plugin.java   |  48 ++++++
 .../apache/pinot/spi/plugin/PluginClassLoader.java | 149 ++++++++++++++++++
 .../org/apache/pinot/spi/plugin/PluginManager.java | 175 +++++++++++++++++++++
 .../apache/pinot/spi/plugin/PluginManagerTest.java |  93 +++++++++++
 pinot-spi/src/test/resources/TestRecordReader.java |  61 +++++++
 6 files changed, 527 insertions(+)

diff --git a/pinot-record-readers/pom.xml b/pinot-record-readers/pom.xml
index 1dc63dc..96c9578 100644
--- a/pinot-record-readers/pom.xml
+++ b/pinot-record-readers/pom.xml
@@ -50,6 +50,7 @@
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-spi</artifactId>
+      <scope>provided</scope>
     </dependency>
     <!-- test -->
     <dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java
new file mode 100644
index 0000000..312e25e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.util.Objects;
+
+
+public class Plugin {
+
+  String _name;
+
+  public Plugin(String name) {
+    _name = name;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Plugin plugin = (Plugin) o;
+    return Objects.equals(_name, plugin._name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(_name);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java
new file mode 100644
index 0000000..b58b29f
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+
+public class PluginClassLoader extends URLClassLoader {
+
+  private final ClassLoader sysClzLoader;
+
+  public PluginClassLoader(URL[] urls, ClassLoader parent) {
+    super(urls, parent);
+    sysClzLoader = getSystemClassLoader();
+    URLClassLoader classLoader = (URLClassLoader) 
ClassLoader.getSystemClassLoader();
+    Method method = null;
+    try {
+      method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+
+    } catch (NoSuchMethodException e) {
+      //this should never happen
+      ExceptionUtils.rethrow(e);
+    }
+    method.setAccessible(true);
+    for (URL url : urls) {
+      try {
+        method.invoke(classLoader, url);
+      } catch (Exception e) {
+        ExceptionUtils.rethrow(e);
+      }
+    }
+  }
+
+  @Override
+  protected Class<?> loadClass(String name, boolean resolve)
+      throws ClassNotFoundException {
+    // has the class loaded already?
+    Class<?> loadedClass = findLoadedClass(name);
+    if (loadedClass == null) {
+      try {
+        if (sysClzLoader != null) {
+          loadedClass = sysClzLoader.loadClass(name);
+        }
+      } catch (ClassNotFoundException ex) {
+        // class not found in system class loader... silently skipping
+      }
+
+      try {
+        // find the class from given jar urls as in first constructor 
parameter.
+        if (loadedClass == null) {
+          loadedClass = findClass(name);
+        }
+      } catch (ClassNotFoundException e) {
+        // class is not found in the given urls.
+        // Let's try it in parent classloader.
+        // If class is still not found, then this method will throw class not 
found ex.
+        loadedClass = super.loadClass(name, resolve);
+      }
+    }
+
+    if (resolve) {      // marked to resolve
+      resolveClass(loadedClass);
+    }
+    return loadedClass;
+  }
+
+  @Override
+  public Enumeration<URL> getResources(String name)
+      throws IOException {
+    List<URL> allRes = new LinkedList<>();
+
+    // load resources from sys class loader
+    Enumeration<URL> sysResources = sysClzLoader.getResources(name);
+    if (sysResources != null) {
+      while (sysResources.hasMoreElements()) {
+        allRes.add(sysResources.nextElement());
+      }
+    }
+
+    // load resource from this classloader
+    Enumeration<URL> thisRes = findResources(name);
+    if (thisRes != null) {
+      while (thisRes.hasMoreElements()) {
+        allRes.add(thisRes.nextElement());
+      }
+    }
+
+    // then try finding resources from parent classloaders
+    Enumeration<URL> parentRes = super.findResources(name);
+    if (parentRes != null) {
+      while (parentRes.hasMoreElements()) {
+        allRes.add(parentRes.nextElement());
+      }
+    }
+
+    return new Enumeration<URL>() {
+      Iterator<URL> it = allRes.iterator();
+
+      @Override
+      public boolean hasMoreElements() {
+        return it.hasNext();
+      }
+
+      @Override
+      public URL nextElement() {
+        return it.next();
+      }
+    };
+  }
+
+  @Override
+  public URL getResource(String name) {
+    URL res = null;
+    if (sysClzLoader != null) {
+      res = sysClzLoader.getResource(name);
+    }
+    if (res == null) {
+      res = findResource(name);
+    }
+    if (res == null) {
+      res = super.getResource(name);
+    }
+    return res;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
new file mode 100644
index 0000000..d6b5458
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+
+
+public class PluginManager {
+
+  public static final String DEFAULT_PLUGIN_NAME = "DEFAULT";
+  static PluginManager PLUGIN_MANAGER = new PluginManager();
+
+  Map<Plugin, PluginClassLoader> _registry = new HashMap<>();
+
+  private PluginManager() {
+    _registry.put(new Plugin(DEFAULT_PLUGIN_NAME), 
createClassLoader(Collections.emptyList()));
+  }
+
+  /**
+   * Loads jars recursively
+   * @param pluginName
+   * @param directory
+   */
+  public void load(String pluginName, File directory) {
+    Collection<File> jarFiles = FileUtils.listFiles(directory, new 
String[]{"jar"}, true);
+    Collection<URL> urlList = new ArrayList<>();
+    for (File jarFile : jarFiles) {
+      try {
+        urlList.add(jarFile.toURI().toURL());
+      } catch (MalformedURLException e) {
+        //ignore
+      }
+    }
+    PluginClassLoader classLoader = createClassLoader(urlList);
+    _registry.put(new Plugin(pluginName), classLoader);
+  }
+
+  private PluginClassLoader createClassLoader(Collection<URL> urlList) {
+    URL[] urls = new URL[urlList.size()];
+    urlList.toArray(urls);
+    //always sort to make the behavior predictable
+    Arrays.sort(urls);
+    return new PluginClassLoader(urls, this.getClass().getClassLoader());
+  }
+
+  /**
+   * Loads a class. The class name can be in any of the following formats
+   * <li>com.x.y.foo</li> loads the class in the default class path
+   * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific 
classloader
+   * @param className
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Class<?> loadClass(String className)
+      throws ClassNotFoundException {
+    String pluginName = DEFAULT_PLUGIN_NAME;
+    String realClassName = DEFAULT_PLUGIN_NAME;
+    if (className.indexOf(":") > -1) {
+      String[] split = className.split("\\:");
+      pluginName = split[0];
+      realClassName = split[1];
+    }
+    return loadClass(pluginName, realClassName);
+  }
+
+  /**
+   * Loads a class using the plugin specific class loader
+   * @param pluginName
+   * @param className
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public Class<?> loadClass(String pluginName, String className)
+      throws ClassNotFoundException {
+    return _registry.get(new Plugin(pluginName)).loadClass(className, true);
+  }
+
+  /**
+   * Create an instance of the className. The className can be in any of the 
following formats
+   * <li>com.x.y.foo</li> loads the class in the default class path
+   * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific 
classloader
+   * @param className
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public <T> T createInstance(String className)
+      throws Exception {
+    return createInstance(className, new Class[]{}, new Object[]{});
+  }
+
+  /**
+   * Create an instance of the className. The className can be in any of the 
following formats
+   * <li>com.x.y.foo</li> loads the class in the default class path
+   * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific 
classloader
+   * @param className
+   * @return
+   * @throws ClassNotFoundException
+   */
+  public <T> T createInstance(String className, Class[] argTypes, Object[] 
argValues)
+      throws Exception {
+    String pluginName = DEFAULT_PLUGIN_NAME;
+    String realClassName = DEFAULT_PLUGIN_NAME;
+    if (className.indexOf(":") > -1) {
+      String[] split = className.split("\\:");
+      pluginName = split[0];
+      realClassName = split[1];
+    }
+    return createInstance(pluginName, realClassName, argTypes, argValues);
+  }
+
+  /**
+   * Creates an instance of className using classloader specific to the plugin
+   * @param pluginName
+   * @param className
+   * @param <T>
+   * @return
+   * @throws Exception
+   */
+  public <T> T createInstance(String pluginName, String className)
+      throws Exception {
+    return createInstance(pluginName, className, new Class[]{}, new 
Object[]{});
+  }
+
+  /**
+   *
+   * @param pluginName
+   * @param className
+   * @param argTypes
+   * @param argValues
+   * @param <T>
+   * @return
+   */
+  public <T> T createInstance(String pluginName, String className, Class[] 
argTypes, Object[] argValues)
+      throws Exception {
+    PluginClassLoader pluginClassLoader = PLUGIN_MANAGER._registry.get(new 
Plugin(pluginName));
+    Class<T> loadedClass = (Class<T>) pluginClassLoader.loadClass(className, 
true);
+    Constructor<?> constructor = loadedClass.getConstructor(argTypes);
+    if (constructor != null) {
+      Object instance = constructor.newInstance(argValues);
+      return (T) instance;
+    }
+    return null;
+  }
+
+  public static PluginManager get() {
+    return PLUGIN_MANAGER;
+  }
+}
+
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
new file mode 100644
index 0000000..ac5a305
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URL;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PluginManagerTest {
+
+  private File tempDir;
+  private String jarFile;
+  private File jarDirFile;
+
+  @BeforeClass
+  public void setup() {
+
+    tempDir = new File(System.getProperty("java.io.tmpdir"), 
"pinot-plugin-test");
+    tempDir.delete();
+    tempDir.mkdirs();
+
+    String jarDir = tempDir + "/" + "test-record-reader";
+    jarFile = jarDir + "/" + "test-record-reader.jar";
+    jarDirFile = new File(jarDir);
+    jarDirFile.mkdirs();
+  }
+
+  @Test
+  public void testSimple()
+      throws Exception {
+    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+    URL javaFile = 
Thread.currentThread().getContextClassLoader().getResource("TestRecordReader.java");
+    if (javaFile != null) {
+      compiler.run(null, null, null, javaFile.getFile(), "-d", 
tempDir.getAbsolutePath());
+
+      URL classFile = 
Thread.currentThread().getContextClassLoader().getResource("TestRecordReader.class");
+
+      if (classFile != null) {
+        JarOutputStream jos = new JarOutputStream(new 
FileOutputStream(jarFile));
+        jos.putNextEntry(new JarEntry(new 
File(classFile.getFile()).getName()));
+        jos.write(FileUtils.readFileToByteArray(new 
File(classFile.getFile())));
+        jos.closeEntry();
+        jos.close();
+
+        PluginManager.get().load("test-record-reader", jarDirFile);
+
+        RecordReader testRecordReader = 
PluginManager.get().createInstance("test-record-reader", "TestRecordReader");
+        testRecordReader.init(null, null, null);
+        int count = 0;
+        while (testRecordReader.hasNext()) {
+          GenericRow row = testRecordReader.next();
+          count++;
+        }
+
+        Assert.assertEquals(count, 10);
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    tempDir.delete();
+    FileUtils.deleteQuietly(jarDirFile);
+  }
+}
diff --git a/pinot-spi/src/test/resources/TestRecordReader.java 
b/pinot-spi/src/test/resources/TestRecordReader.java
new file mode 100644
index 0000000..8f212a1
--- /dev/null
+++ b/pinot-spi/src/test/resources/TestRecordReader.java
@@ -0,0 +1,61 @@
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+
+
+/**
+ * Record reader for AVRO file.
+ */
+public class TestRecordReader implements RecordReader {
+
+  List<GenericRow> _rows = new ArrayList<>();
+  Iterator<GenericRow> _iterator;
+  Schema _schema;
+
+  public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig 
recordReaderConfig)
+      throws IOException {
+    _schema = schema;
+    int numRows = 10;
+    for (int i = 0; i < numRows; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue("key", "value-" + i);
+      _rows.add(row);
+    }
+    _iterator = _rows.iterator();
+  }
+
+  public boolean hasNext() {
+    return _iterator.hasNext();
+  }
+
+  public GenericRow next()
+      throws IOException {
+    return _iterator.next();
+  }
+
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    return _iterator.next();
+  }
+
+  public void rewind()
+      throws IOException {
+    _iterator = _rows.iterator();
+  }
+
+  public Schema getSchema() {
+    return _schema;
+  }
+  public void close () {
+
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to