This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 82e92ba7851 [fix][fn] fix function failed to start if no
`typeClassName` provided in `FunctionDetails` (#18111)
82e92ba7851 is described below
commit 82e92ba7851199fb36835abf24c3ea7686b6c888
Author: Rui Fu <[email protected]>
AuthorDate: Fri Nov 11 14:25:32 2022 +0800
[fix][fn] fix function failed to start if no `typeClassName` provided in
`FunctionDetails` (#18111)
(cherry picked from commit 8ad7157c1d22195720d256391a32773ca0108b80)
---
.../functions/runtime/JavaInstanceStarter.java | 115 ++++++++++++++++++---
1 file changed, 99 insertions(+), 16 deletions(-)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index e65a8e254e2..dc1fcb4f603 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.functions.runtime;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
@@ -37,6 +39,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -49,6 +52,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.FunctionCommon;
@Slf4j
@@ -171,6 +175,7 @@ public class JavaInstanceStarter implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0,
functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString,
functionDetailsBuilder);
+ inferringMissingTypeClassName(functionDetailsBuilder,
functionInstanceClassLoader);
Function.FunctionDetails functionDetails =
functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
@@ -247,16 +252,16 @@ public class JavaInstanceStarter implements AutoCloseable
{
if (expectedHealthCheckInterval > 0) {
healthCheckTimer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(()
-> {
- try {
- if (System.currentTimeMillis() - lastHealthCheckTs
- > 3 * expectedHealthCheckInterval * 1000) {
- log.info("Haven't received health check from spawner
in a while. Stopping instance...");
- close();
- }
- } catch (Exception e) {
- log.error("Error occurred when checking for latest health
check", e);
- }
- }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval
* 1000, TimeUnit.MILLISECONDS);
+ try {
+ if (System.currentTimeMillis() - lastHealthCheckTs
+ > 3 * expectedHealthCheckInterval * 1000) {
+ log.info("Haven't received health check from
spawner in a while. Stopping instance...");
+ close();
+ }
+ } catch (Exception e) {
+ log.error("Error occurred when checking for latest
health check", e);
+ }
+ }, expectedHealthCheckInterval * 1000,
expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
}
runtimeSpawner.join();
@@ -294,6 +299,84 @@ public class JavaInstanceStarter implements AutoCloseable {
}
}
+ private void
inferringMissingTypeClassName(Function.FunctionDetails.Builder
functionDetailsBuilder,
+ ClassLoader classLoader) throws
ClassNotFoundException {
+ switch (functionDetailsBuilder.getComponentType()) {
+ case FUNCTION:
+ if ((functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
+ || (functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+ Map<String, Object> userConfigs = new
Gson().fromJson(functionDetailsBuilder.getUserConfig(),
+ new TypeToken<Map<String, Object>>() {
+ }.getType());
+ boolean isWindowConfigPresent =
userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+ String className = functionDetailsBuilder.getClassName();
+ if (isWindowConfigPresent) {
+ WindowConfig windowConfig = new Gson().fromJson(
+ (new
Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
+ WindowConfig.class);
+ className =
windowConfig.getActualWindowFunctionClassName();
+ }
+
+ Class<?>[] typeArgs =
FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
+ isWindowConfigPresent);
+ if (functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
+ && typeArgs[0] != null) {
+ Function.SourceSpec.Builder sourceBuilder =
functionDetailsBuilder.getSource().toBuilder();
+ sourceBuilder.setTypeClassName(typeArgs[0].getName());
+
functionDetailsBuilder.setSource(sourceBuilder.build());
+ }
+
+ if (functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
+ && typeArgs[1] != null) {
+ Function.SinkSpec.Builder sinkBuilder =
functionDetailsBuilder.getSink().toBuilder();
+ sinkBuilder.setTypeClassName(typeArgs[1].getName());
+ functionDetailsBuilder.setSink(sinkBuilder.build());
+ }
+ }
+ break;
+ case SINK:
+ if ((functionDetailsBuilder.hasSink()
+ &&
functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
+ String typeArg =
getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+ Function.SinkSpec.Builder sinkBuilder =
+
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSink(sinkBuilder);
+
+ Function.SourceSpec sourceSpec =
functionDetailsBuilder.getSource();
+ if (null == sourceSpec ||
StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
+ Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(sourceSpec);
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSource(sourceBuilder);
+ }
+ }
+ break;
+ case SOURCE:
+ if ((functionDetailsBuilder.hasSource()
+ &&
functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
+ String typeArg =
getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+
+ Function.SourceSpec.Builder sourceBuilder =
+
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
+ sourceBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSource(sourceBuilder);
+
+ Function.SinkSpec sinkSpec =
functionDetailsBuilder.getSink();
+ if (null == sinkSpec ||
StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
+ Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(sinkSpec);
+ sinkBuilder.setTypeClassName(typeArg);
+ functionDetailsBuilder.setSink(sinkBuilder);
+ }
+ }
+ break;
+ }
+ }
+
class InstanceControlImpl extends
InstanceControlGrpc.InstanceControlImplBase {
private RuntimeSpawner runtimeSpawner;
@@ -319,8 +402,8 @@ public class JavaInstanceStarter implements AutoCloseable {
@Override
public void getAndResetMetrics(com.google.protobuf.Empty request,
-
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
- responseObserver) {
+
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
+ responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
@@ -336,8 +419,8 @@ public class JavaInstanceStarter implements AutoCloseable {
@Override
public void getMetrics(com.google.protobuf.Empty request,
-
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
- responseObserver) {
+
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
+ responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
@@ -368,8 +451,8 @@ public class JavaInstanceStarter implements AutoCloseable {
@Override
public void healthCheck(com.google.protobuf.Empty request,
-
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
- responseObserver) {
+
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
+ responseObserver) {
log.debug("Received health check request...");
InstanceCommunication.HealthCheckResult healthCheckResult =
InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();