This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 82e92ba7851 [fix][fn] fix function failed to start if no 
`typeClassName` provided in `FunctionDetails` (#18111)
82e92ba7851 is described below

commit 82e92ba7851199fb36835abf24c3ea7686b6c888
Author: Rui Fu <[email protected]>
AuthorDate: Fri Nov 11 14:25:32 2022 +0800

    [fix][fn] fix function failed to start if no `typeClassName` provided in 
`FunctionDetails` (#18111)
    
    (cherry picked from commit 8ad7157c1d22195720d256391a32773ca0108b80)
---
 .../functions/runtime/JavaInstanceStarter.java     | 115 ++++++++++++++++++---
 1 file changed, 99 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index e65a8e254e2..dc1fcb4f603 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.runtime;
 
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
@@ -37,6 +39,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -49,6 +52,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 
 @Slf4j
@@ -171,6 +175,7 @@ public class JavaInstanceStarter implements AutoCloseable {
             functionDetailsJsonString = functionDetailsJsonString.substring(0, 
functionDetailsJsonString.length() - 1);
         }
         JsonFormat.parser().merge(functionDetailsJsonString, 
functionDetailsBuilder);
+        inferringMissingTypeClassName(functionDetailsBuilder, 
functionInstanceClassLoader);
         Function.FunctionDetails functionDetails = 
functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
@@ -247,16 +252,16 @@ public class JavaInstanceStarter implements AutoCloseable 
{
         if (expectedHealthCheckInterval > 0) {
             healthCheckTimer =
                     
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
 -> {
-                try {
-                    if (System.currentTimeMillis() - lastHealthCheckTs
-                            > 3 * expectedHealthCheckInterval * 1000) {
-                        log.info("Haven't received health check from spawner 
in a while. Stopping instance...");
-                        close();
-                    }
-                } catch (Exception e) {
-                    log.error("Error occurred when checking for latest health 
check", e);
-                }
-            }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval 
* 1000, TimeUnit.MILLISECONDS);
+                        try {
+                            if (System.currentTimeMillis() - lastHealthCheckTs
+                                    > 3 * expectedHealthCheckInterval * 1000) {
+                                log.info("Haven't received health check from 
spawner in a while. Stopping instance...");
+                                close();
+                            }
+                        } catch (Exception e) {
+                            log.error("Error occurred when checking for latest 
health check", e);
+                        }
+                    }, expectedHealthCheckInterval * 1000, 
expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
         }
 
         runtimeSpawner.join();
@@ -294,6 +299,84 @@ public class JavaInstanceStarter implements AutoCloseable {
         }
     }
 
+    private void 
inferringMissingTypeClassName(Function.FunctionDetails.Builder 
functionDetailsBuilder,
+                                               ClassLoader classLoader) throws 
ClassNotFoundException {
+        switch (functionDetailsBuilder.getComponentType()) {
+            case FUNCTION:
+                if ((functionDetailsBuilder.hasSource()
+                        && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
+                        || (functionDetailsBuilder.hasSink()
+                        && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+                    Map<String, Object> userConfigs = new 
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
+                            new TypeToken<Map<String, Object>>() {
+                            }.getType());
+                    boolean isWindowConfigPresent = 
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+                    String className = functionDetailsBuilder.getClassName();
+                    if (isWindowConfigPresent) {
+                        WindowConfig windowConfig = new Gson().fromJson(
+                                (new 
Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
+                                WindowConfig.class);
+                        className = 
windowConfig.getActualWindowFunctionClassName();
+                    }
+
+                    Class<?>[] typeArgs = 
FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+                            isWindowConfigPresent);
+                    if (functionDetailsBuilder.hasSource()
+                            && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
+                            && typeArgs[0] != null) {
+                        Function.SourceSpec.Builder sourceBuilder = 
functionDetailsBuilder.getSource().toBuilder();
+                        sourceBuilder.setTypeClassName(typeArgs[0].getName());
+                        
functionDetailsBuilder.setSource(sourceBuilder.build());
+                    }
+
+                    if (functionDetailsBuilder.hasSink()
+                            && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
+                            && typeArgs[1] != null) {
+                        Function.SinkSpec.Builder sinkBuilder = 
functionDetailsBuilder.getSink().toBuilder();
+                        sinkBuilder.setTypeClassName(typeArgs[1].getName());
+                        functionDetailsBuilder.setSink(sinkBuilder.build());
+                    }
+                }
+                break;
+            case SINK:
+                if ((functionDetailsBuilder.hasSink()
+                        && 
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+                    String typeArg = 
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+                    Function.SinkSpec.Builder sinkBuilder =
+                            
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
+                    sinkBuilder.setTypeClassName(typeArg);
+                    functionDetailsBuilder.setSink(sinkBuilder);
+
+                    Function.SourceSpec sourceSpec = 
functionDetailsBuilder.getSource();
+                    if (null == sourceSpec || 
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+                        Function.SourceSpec.Builder sourceBuilder = 
Function.SourceSpec.newBuilder(sourceSpec);
+                        sourceBuilder.setTypeClassName(typeArg);
+                        functionDetailsBuilder.setSource(sourceBuilder);
+                    }
+                }
+                break;
+            case SOURCE:
+                if ((functionDetailsBuilder.hasSource()
+                        && 
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
+                    String typeArg = 
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+                    Function.SourceSpec.Builder sourceBuilder =
+                            
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
+                    sourceBuilder.setTypeClassName(typeArg);
+                    functionDetailsBuilder.setSource(sourceBuilder);
+
+                    Function.SinkSpec sinkSpec = 
functionDetailsBuilder.getSink();
+                    if (null == sinkSpec || 
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+                        Function.SinkSpec.Builder sinkBuilder = 
Function.SinkSpec.newBuilder(sinkSpec);
+                        sinkBuilder.setTypeClassName(typeArg);
+                        functionDetailsBuilder.setSink(sinkBuilder);
+                    }
+                }
+                break;
+        }
+    }
+
 
     class InstanceControlImpl extends 
InstanceControlGrpc.InstanceControlImplBase {
         private RuntimeSpawner runtimeSpawner;
@@ -319,8 +402,8 @@ public class JavaInstanceStarter implements AutoCloseable {
 
         @Override
         public void getAndResetMetrics(com.google.protobuf.Empty request,
-            
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
-                    responseObserver) {
+                   
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
+                   responseObserver) {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
@@ -336,8 +419,8 @@ public class JavaInstanceStarter implements AutoCloseable {
 
         @Override
         public void getMetrics(com.google.protobuf.Empty request,
-          
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
-                  responseObserver) {
+                   
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
+                   responseObserver) {
             Runtime runtime = runtimeSpawner.getRuntime();
             if (runtime != null) {
                 try {
@@ -368,8 +451,8 @@ public class JavaInstanceStarter implements AutoCloseable {
 
         @Override
         public void healthCheck(com.google.protobuf.Empty request,
-         
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
-                 responseObserver) {
+                
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
+                responseObserver) {
             log.debug("Received health check request...");
             InstanceCommunication.HealthCheckResult healthCheckResult =
                     
InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();

Reply via email to