eolivelli commented on code in PR #20116:
URL: https://github.com/apache/pulsar/pull/20116#discussion_r1174842575


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -870,6 +872,56 @@ private void setupInput(ContextImpl contextImpl) throws 
Exception {
             
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+    private Map<String, Object> parseComponentConfig(String connectorConfigs) 
throws IOException {
+        return parseComponentConfig(connectorConfigs, instanceConfig, 
componentClassLoader, componentType);
+    }
+
+    static Map<String, Object> parseComponentConfig(String connectorConfigs,
+                                                    InstanceConfig 
instanceConfig,
+                                                    ClassLoader 
componentClassLoader,
+                                                    
org.apache.pulsar.functions.proto.Function
+                                                            
.FunctionDetails.ComponentType componentType)
+            throws IOException {
+        final Map<String, Object> config = ObjectMapperFactory
+                .getMapper()
+                .reader()
+                .forType(new TypeReference<Map<String, Object>>() {})
+                .readValue(connectorConfigs);
+        if (instanceConfig.isIgnoreUnknownConfigFields() && 
componentClassLoader instanceof NarClassLoader) {
+            final String configClass;
+            if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
 {
+                configClass = ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSourceConfigClass();
+            } else if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
+                configClass =  ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSinkConfigClass();
+            } else {
+                return config;
+            }
+            if (configClass != null) {
+                final Object configInstance = 
Reflections.createInstance(configClass,
+                        Thread.currentThread().getContextClassLoader());
+                final List<String> allFields =

Review Comment:
   I am not sure that this is correct.
   
   ObjectMapper should follow Java Beans conventions and use getter/setters 
together with public fields.
   We should use some ObjectMapper utilities here in order to ensure that we 
are doing the right thing



##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java:
##########
@@ -870,6 +872,56 @@ private void setupInput(ContextImpl contextImpl) throws 
Exception {
             
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+    private Map<String, Object> parseComponentConfig(String connectorConfigs) 
throws IOException {
+        return parseComponentConfig(connectorConfigs, instanceConfig, 
componentClassLoader, componentType);
+    }
+
+    static Map<String, Object> parseComponentConfig(String connectorConfigs,
+                                                    InstanceConfig 
instanceConfig,
+                                                    ClassLoader 
componentClassLoader,
+                                                    
org.apache.pulsar.functions.proto.Function
+                                                            
.FunctionDetails.ComponentType componentType)
+            throws IOException {
+        final Map<String, Object> config = ObjectMapperFactory
+                .getMapper()
+                .reader()
+                .forType(new TypeReference<Map<String, Object>>() {})
+                .readValue(connectorConfigs);
+        if (instanceConfig.isIgnoreUnknownConfigFields() && 
componentClassLoader instanceof NarClassLoader) {
+            final String configClass;
+            if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
 {
+                configClass = ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSourceConfigClass();
+            } else if (componentType == 
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
+                configClass =  ConnectorUtils
+                        .getConnectorDefinition((NarClassLoader) 
componentClassLoader).getSinkConfigClass();
+            } else {
+                return config;
+            }
+            if (configClass != null) {
+                final Object configInstance = 
Reflections.createInstance(configClass,
+                        Thread.currentThread().getContextClassLoader());
+                final List<String> allFields =
+                        Reflections
+                                .getAllFields(configInstance.getClass())
+                                .stream()
+                                .map(Field::getName)
+                                .collect(Collectors.toList());
+
+                for (String s : config.keySet()) {
+                    if (!allFields.contains(s)) {
+                        log.warn("Field '{}' not defined in the {} 
configuration {}, the field will be ignored",

Review Comment:
   maybe this should be logged as "ERROR", WARNINGs tend to be ignored by alert 
systems



##########
pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java:
##########
@@ -738,6 +738,17 @@ public String getFunctionAuthProviderClassName() {
     )
     private List<String> additionalJavaRuntimeArguments = new ArrayList<>();
 
+    @FieldContext(
+            category = CATEGORY_CONNECTORS,
+            doc = "Whether to ignore unknown properties when deserializing the 
connector configuration. "
+                    + "After upgrading a connector to a new version with a new 
configuration, "
+                    + "the new configuration may not be compatible with the 
old connector. "
+                    + "In case of rollback, it's required to also rollback the 
connector configuration. "
+                    + "Ignoring unknown fields makes possible to keep the new 
configuration and "
+                    + "only rollback the connector."
+    )
+    private boolean ignoreUnknownConfigFields = false;

Review Comment:
   This configuration applies to the instances of the functions/connectors, and 
not to the function worker (that already  ignores unknown fields)
   
   what about 'functionsIgnoreUnknownConfigFields' ?    (maybe we can do 
better, but connectors are actually functions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to