nlu90 commented on a change in pull request #9413:
URL: https://github.com/apache/pulsar/pull/9413#discussion_r570440500
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
##########
@@ -383,4 +377,112 @@ public static double roundDecimal(double value, int
places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
+
+ public static ClassLoader getClassLoaderFromPackage(
+
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType
componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader classLoader;
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(packageFile,
narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if connector class name is not provided, we can only try to load
archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package
does not have the correct format. " +
+ "Pulsar cannot determine if the package is a
NAR package or JAR package. " +
+ "%s classname is not provided and attempts to
load it as a NAR package produced the following error.",
+ capFirstLetter(componentType),
capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
+ try {
+ if (componentType ==
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
{
+ connectorClassName =
ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
+ } else {
+ connectorClassName =
ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to
extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
+ }
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e);
+ }
+
+ } else {
+ // if connector class name is provided, we need to try to load it
as a JAR and as a NAR.
Review comment:
s/and/or
##########
File path:
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
##########
@@ -510,39 +514,35 @@ private File getBuiltinArchive(FunctionDetails.Builder
functionDetails) throws I
throw new IOException("Could not find built in archive definition");
}
- private void fillSourceTypeClass(FunctionDetails.Builder functionDetails,
File archive, String className)
- throws IOException, ClassNotFoundException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive,
Collections.emptySet(), workerConfig.getNarExtractionDirectory())) {
- String typeArg = getSourceType(className, ncl).getName();
+ private void fillSourceTypeClass(FunctionDetails.Builder functionDetails,
Review comment:
this method and the following `fillSinkTypeClass` method looks very
similar, wondering if we could implement them into one single method to reduce
code duplication.
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
##########
@@ -383,4 +377,112 @@ public static double roundDecimal(double value, int
places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
+
+ public static ClassLoader getClassLoaderFromPackage(
+
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType
componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader classLoader;
+ ClassLoader jarClassLoader = null;
+ ClassLoader narClassLoader = null;
+
+ Exception jarClassLoaderException = null;
+ Exception narClassLoaderException = null;
+
+ try {
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(packageFile);
+ } catch (Exception e) {
+ jarClassLoaderException = e;
+ }
+ try {
+ narClassLoader = FunctionCommon.extractNarClassLoader(packageFile,
narExtractionDirectory);
+ } catch (Exception e) {
+ narClassLoaderException = e;
+ }
+
+ // if connector class name is not provided, we can only try to load
archive as a NAR
+ if (isEmpty(connectorClassName)) {
+ if (narClassLoader == null) {
+ throw new IllegalArgumentException(String.format("%s package
does not have the correct format. " +
+ "Pulsar cannot determine if the package is a
NAR package or JAR package. " +
+ "%s classname is not provided and attempts to
load it as a NAR package produced the following error.",
+ capFirstLetter(componentType),
capFirstLetter(componentType)),
+ narClassLoaderException);
+ }
+ try {
+ if (componentType ==
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
{
+ connectorClassName =
ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
+ } else {
+ connectorClassName =
ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to
extract %s class from archive",
+ componentType.toString().toLowerCase()), e);
+ }
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
capFirstLetter(componentType), connectorClassName), e);
+ }
+
+ } else {
+ // if connector class name is provided, we need to try to load it
as a JAR and as a NAR.
+ if (jarClassLoader != null) {
+ try {
+ jarClassLoader.loadClass(connectorClassName);
+ classLoader = jarClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ // class not found in JAR try loading as a NAR and
searching for the class
+ if (narClassLoader != null) {
+
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError
e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in
class path",
+ capFirstLetter(componentType),
connectorClassName), e1);
+ }
+ } else {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class
path", capFirstLetter(componentType),
+ connectorClassName), e);
+ }
+ }
+ } else if (narClassLoader != null) {
+ try {
+ narClassLoader.loadClass(connectorClassName);
+ classLoader = narClassLoader;
+ } catch (ClassNotFoundException | NoClassDefFoundError e1) {
+ throw new IllegalArgumentException(
+ String.format("%s class %s must be in class path",
+ capFirstLetter(componentType),
connectorClassName), e1);
+ }
+ } else {
+ StringBuilder errorMsg = new
StringBuilder(capFirstLetter(componentType)
+ + " package does not have the correct format."
+ + " Pulsar cannot determine if the package is a NAR
package or JAR package.");
+
+ if (jarClassLoaderException != null) {
+ errorMsg.append("Attempts to load it as a JAR package
produced error: " + jarClassLoaderException.getMessage());
+ }
+
Review comment:
do you want to add some linebreak or delimiter between these two error
message for better reading?
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
##########
@@ -102,57 +99,47 @@ public static String getIOSinkClass(ClassLoader
classLoader) throws IOException
return conf.getSinkClass();
}
- public static ConnectorDefinition getConnectorDefinition(String narPath,
String narExtractionDirectory) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new
File(narPath), Collections.emptySet(), narExtractionDirectory)) {
- return getConnectorDefinition(ncl);
- }
- }
-
- public static ConnectorDefinition getConnectorDefinition(ClassLoader
classLoader) throws IOException {
- NarClassLoader narClassLoader = (NarClassLoader) classLoader;
+ public static ConnectorDefinition getConnectorDefinition(NarClassLoader
narClassLoader) throws IOException {
String configStr =
narClassLoader.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
ConnectorDefinition.class);
}
- public static List<ConfigFieldDefinition>
getConnectorConfigDefinition(String narPath,
-
String configClassName,
-
String narExtractionDirectory) throws Exception {
+ public static List<ConfigFieldDefinition>
getConnectorConfigDefinition(ClassLoader narClassLoader,
Review comment:
Do you mean `NarClassLoader narClassLoader` here?
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
##########
@@ -383,4 +377,112 @@ public static double roundDecimal(double value, int
places) {
double scale = Math.pow(10, places);
return Math.round(value * scale) / scale;
}
+
+ public static ClassLoader getClassLoaderFromPackage(
+
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType
componentType,
+ String className,
+ File packageFile,
+ String narExtractionDirectory) {
+ String connectorClassName = className;
+ ClassLoader classLoader;
Review comment:
instead of having this variable and return it at the end. I suggest
return the classLoader at the end of each if/else clause. Easier to read
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]