This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new cf95d12257f [fix][fn] fix function failed to start if no 
`typeClassName` provided in `FunctionDetails` (#18111)
cf95d12257f is described below

commit cf95d12257f87f53d35f38b1f71b6d7ebe8c8594
Author: Rui Fu <[email protected]>
AuthorDate: Fri Nov 11 14:25:32 2022 +0800

    [fix][fn] fix function failed to start if no `typeClassName` provided in 
`FunctionDetails` (#18111)
    
    (cherry picked from commit 8ad7157c1d22195720d256391a32773ca0108b80)
---
 .../functions/runtime/JavaInstanceStarter.java     | 84 +++++++++++++++++++++-
 1 file changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 5995b87b705..36d063f8f55 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -19,6 +19,8 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
@@ -33,6 +35,7 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
@@ -45,13 +48,13 @@ import 
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.common.util.Reflections;
-
 import java.lang.reflect.Type;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 
 @Slf4j
@@ -165,6 +168,7 @@ public class JavaInstanceStarter implements AutoCloseable {
             functionDetailsJsonString = functionDetailsJsonString.substring(0, 
functionDetailsJsonString.length() - 1);
         }
         JsonFormat.parser().merge(functionDetailsJsonString, 
functionDetailsBuilder);
+        inferringMissingTypeClassName(functionDetailsBuilder, 
functionInstanceClassLoader);
         Function.FunctionDetails functionDetails = 
functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
@@ -287,6 +291,84 @@ public class JavaInstanceStarter implements AutoCloseable {
         }
     }
 
+    private void 
inferringMissingTypeClassName(Function.FunctionDetails.Builder 
functionDetailsBuilder,
+                                               ClassLoader classLoader) throws 
ClassNotFoundException {
+        switch (functionDetailsBuilder.getComponentType()) {
+            case FUNCTION:
+                if ((functionDetailsBuilder.hasSource()
+                        && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
+                        || (functionDetailsBuilder.hasSink()
+                        && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+                    Map<String, Object> userConfigs = new 
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
+                            new TypeToken<Map<String, Object>>() {
+                            }.getType());
+                    boolean isWindowConfigPresent = 
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+                    String className = functionDetailsBuilder.getClassName();
+                    if (isWindowConfigPresent) {
+                        WindowConfig windowConfig = new Gson().fromJson(
+                                (new 
Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
+                                WindowConfig.class);
+                        className = 
windowConfig.getActualWindowFunctionClassName();
+                    }
+
+                    Class<?>[] typeArgs = 
FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+                            isWindowConfigPresent);
+                    if (functionDetailsBuilder.hasSource()
+                            && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
+                            && typeArgs[0] != null) {
+                        Function.SourceSpec.Builder sourceBuilder = 
functionDetailsBuilder.getSource().toBuilder();
+                        sourceBuilder.setTypeClassName(typeArgs[0].getName());
+                        
functionDetailsBuilder.setSource(sourceBuilder.build());
+                    }
+
+                    if (functionDetailsBuilder.hasSink()
+                            && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
+                            && typeArgs[1] != null) {
+                        Function.SinkSpec.Builder sinkBuilder = 
functionDetailsBuilder.getSink().toBuilder();
+                        sinkBuilder.setTypeClassName(typeArgs[1].getName());
+                        functionDetailsBuilder.setSink(sinkBuilder.build());
+                    }
+                }
+                break;
+            case SINK:
+                if ((functionDetailsBuilder.hasSink()
+                        && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+                    String typeArg = 
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+                    Function.SinkSpec.Builder sinkBuilder =
+                            
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
+                    sinkBuilder.setTypeClassName(typeArg);
+                    functionDetailsBuilder.setSink(sinkBuilder);
+
+                    Function.SourceSpec sourceSpec = 
functionDetailsBuilder.getSource();
+                    if (null == sourceSpec || 
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+                        Function.SourceSpec.Builder sourceBuilder = 
Function.SourceSpec.newBuilder(sourceSpec);
+                        sourceBuilder.setTypeClassName(typeArg);
+                        functionDetailsBuilder.setSource(sourceBuilder);
+                    }
+                }
+                break;
+            case SOURCE:
+                if ((functionDetailsBuilder.hasSource()
+                        && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
+                    String typeArg = 
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+                    Function.SourceSpec.Builder sourceBuilder =
+                            
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
+                    sourceBuilder.setTypeClassName(typeArg);
+                    functionDetailsBuilder.setSource(sourceBuilder);
+
+                    Function.SinkSpec sinkSpec = 
functionDetailsBuilder.getSink();
+                    if (null == sinkSpec || 
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+                        Function.SinkSpec.Builder sinkBuilder = 
Function.SinkSpec.newBuilder(sinkSpec);
+                        sinkBuilder.setTypeClassName(typeArg);
+                        functionDetailsBuilder.setSink(sinkBuilder);
+                    }
+                }
+                break;
+        }
+    }
+
 
     class InstanceControlImpl extends 
InstanceControlGrpc.InstanceControlImplBase {
         private RuntimeSpawner runtimeSpawner;

Reply via email to