This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f7c0b3c49c9 [improve][fn] Allow unknown fields in connectors config
(#20116)
f7c0b3c49c9 is described below
commit f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Apr 26 17:36:56 2023 +0200
[improve][fn] Allow unknown fields in connectors config (#20116)
---
conf/functions_worker.yml | 6 ++
.../pulsar/functions/instance/InstanceConfig.java | 1 +
.../functions/instance/JavaInstanceRunnable.java | 97 +++++++++++++++++--
.../instance/JavaInstanceRunnableTest.java | 104 ++++++++++++++++++---
.../functions/runtime/JavaInstanceStarter.java | 7 ++
.../pulsar/functions/runtime/RuntimeUtils.java | 6 +-
.../pulsar/functions/worker/WorkerConfig.java | 11 +++
.../pulsar/functions/worker/FunctionActioner.java | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 3 +-
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
.../resources/META-INF/services/pulsar-io.yaml | 1 +
14 files changed, 217 insertions(+), 24 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index bb15e0ca416..4c5b6aab1b7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -407,6 +407,12 @@ validateConnectorConfig: false
# If it is set to true, you must ensure that it has been initialized by
"bin/pulsar initialize-cluster-metadata" command.
initializedDlogMetadata: false
+# Whether to ignore unknown properties when deserializing the connector
configuration.
+# After upgrading a connector to a new version with a new configuration, the
new configuration may not be compatible with the old connector.
+# In case of rollback, it's required to also rollback the connector
configuration.
+# Ignoring unknown fields makes possible to keep the new configuration and
only rollback the connector.
+ignoreUnknownConfigFields: false
+
###########################
# Arbitrary Configuration
###########################
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 1a89505d9bb..fcee6d734d6 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -48,6 +48,7 @@ public class InstanceConfig {
private boolean exposePulsarAdminClientEnabled = false;
private int metricsPort;
private List<String> additionalJavaRuntimeArguments =
Collections.emptyList();
+ private boolean ignoreUnknownConfigFields;
/**
* Get the string representation of {@link #getInstanceId()}.
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e2ad9e4c989..c3f36f754da 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,13 +19,20 @@
package org.apache.pulsar.functions.instance;
import static
org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.BeanDeserializer;
import com.google.common.annotations.VisibleForTesting;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
@@ -34,6 +41,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +67,7 @@ import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -94,6 +103,7 @@ import
org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
@@ -855,10 +865,7 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
- this.source.open(
- ObjectMapperFactory.getMapper().reader().forType(new
TypeReference<Map<String, Object>>() {
- }).readValue(sourceSpec.getConfigs())
- , contextImpl);
+
this.source.open(parseComponentConfig(sourceSpec.getConfigs()), contextImpl);
}
if (this.source instanceof PulsarSource) {
contextImpl.setInputConsumers(((PulsarSource)
this.source).getInputConsumers());
@@ -870,6 +877,83 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
+ private Map<String, Object> parseComponentConfig(String connectorConfigs)
throws IOException {
+ return parseComponentConfig(connectorConfigs, instanceConfig,
componentClassLoader, componentType);
+ }
+
+ static Map<String, Object> parseComponentConfig(String connectorConfigs,
+ InstanceConfig
instanceConfig,
+ ClassLoader
componentClassLoader,
+
org.apache.pulsar.functions.proto.Function
+
.FunctionDetails.ComponentType componentType)
+ throws IOException {
+ final Map<String, Object> config = ObjectMapperFactory
+ .getMapper()
+ .reader()
+ .forType(new TypeReference<Map<String, Object>>() {})
+ .readValue(connectorConfigs);
+ if (instanceConfig.isIgnoreUnknownConfigFields() &&
componentClassLoader instanceof NarClassLoader) {
+ final String configClassName;
+ if (componentType ==
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE)
{
+ configClassName = ConnectorUtils
+ .getConnectorDefinition((NarClassLoader)
componentClassLoader).getSourceConfigClass();
+ } else if (componentType ==
org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
+ configClassName = ConnectorUtils
+ .getConnectorDefinition((NarClassLoader)
componentClassLoader).getSinkConfigClass();
+ } else {
+ return config;
+ }
+ if (configClassName != null) {
+
+ Class<?> configClass;
+ try {
+ configClass = Class.forName(configClassName,
+ true,
Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Config class not found: " +
configClassName, e);
+ }
+ final List<String> allFields =
BeanPropertiesReader.getBeanProperties(configClass);
+
+ for (String s : config.keySet()) {
+ if (!allFields.contains(s)) {
+ log.error("Field '{}' not defined in the {}
configuration {}, the field will be ignored",
+ s,
+ componentType,
+ configClass);
+ config.remove(s);
+ }
+ }
+ }
+ }
+ return config;
+ }
+
+ static final class BeanPropertiesReader {
+
+ private static final MapperBeanReader reader = new MapperBeanReader();
+
+ private static final class MapperBeanReader extends ObjectMapper {
+ @SneakyThrows
+ List<String> getBeanProperties(Class<?> valueType) {
+ final JsonParser parser = ObjectMapperFactory
+ .getMapper()
+ .getObjectMapper()
+ .createParser("");
+ DeserializationConfig config = getDeserializationConfig();
+ DeserializationContext ctxt =
createDeserializationContext(parser, config);
+ BeanDeserializer deser = (BeanDeserializer)
+ _findRootDeserializer(ctxt,
_typeFactory.constructType(valueType));
+ List<String> list = new ArrayList<>();
+ deser.properties().forEachRemaining(p ->
list.add(p.getName()));
+ return list;
+ }
+ }
+
+ static List<String> getBeanProperties(Class<?> valueType) {
+ return reader.getBeanProperties(valueType);
+ }
+ }
+
private void setupOutput(ContextImpl contextImpl) throws Exception {
@@ -940,9 +1024,8 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
log.debug("Opening Sink with SinkSpec {} and contextImpl:
{} ", sinkSpec,
contextImpl.toString());
}
-
this.sink.open(ObjectMapperFactory.getMapper().reader().forType(
- new TypeReference<Map<String, Object>>() {
- }).readValue(sinkSpec.getConfigs()), contextImpl);
+ final Map<String, Object> config =
parseComponentConfig(sinkSpec.getConfigs());
+ this.sink.open(config, contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index a36a3ca62d1..5fea8bcc9fd 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -18,19 +18,27 @@
*/
package org.apache.pulsar.functions.instance;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
@@ -38,11 +46,10 @@ import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.proto.Function.SinkSpecOrBuilder;
-import org.apache.pulsar.functions.proto.Function.SourceSpecOrBuilder;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.jetbrains.annotations.NotNull;
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class JavaInstanceRunnableTest {
@@ -159,7 +166,7 @@ public class JavaInstanceRunnableTest {
@NotNull
private JavaInstanceRunnable getJavaInstanceRunnable(boolean autoAck,
- org.apache.pulsar.functions.proto.Function.ProcessingGuarantees
processingGuarantees) throws Exception {
+
org.apache.pulsar.functions.proto.Function.ProcessingGuarantees
processingGuarantees) throws Exception {
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setAutoAck(autoAck)
.setProcessingGuarantees(processingGuarantees).build();
@@ -184,23 +191,90 @@ public class JavaInstanceRunnableTest {
@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
- SinkSpecOrBuilder sinkSpec = mock(SinkSpecOrBuilder.class);
- when(sinkSpec.getConfigs()).thenReturn("{\"ttl\":
9223372036854775807}");
- Map<String, Object> parsedConfig =
- new ObjectMapper().readValue(sinkSpec.getConfigs(), new
TypeReference<Map<String, Object>>() {
- });
+ final Map<String, Object> parsedConfig =
JavaInstanceRunnable.parseComponentConfig(
+ "{\"ttl\": 9223372036854775807}",
+ new InstanceConfig(),
+ null,
+ FunctionDetails.ComponentType.SINK
+ );
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}
@Test
public void testSourceConfigParsingPreservesOriginalType() throws
Exception {
- SourceSpecOrBuilder sourceSpec = mock(SourceSpecOrBuilder.class);
- when(sourceSpec.getConfigs()).thenReturn("{\"ttl\":
9223372036854775807}");
- Map<String, Object> parsedConfig =
- new ObjectMapper().readValue(sourceSpec.getConfigs(), new
TypeReference<Map<String, Object>>() {
- });
+ final Map<String, Object> parsedConfig =
JavaInstanceRunnable.parseComponentConfig(
+ "{\"ttl\": 9223372036854775807}",
+ new InstanceConfig(),
+ null,
+ FunctionDetails.ComponentType.SOURCE
+ );
Assert.assertEquals(parsedConfig.get("ttl").getClass(), Long.class);
Assert.assertEquals(parsedConfig.get("ttl"), Long.MAX_VALUE);
}
+
+
+ public static class ConnectorTestConfig1 {
+ public String field1;
+ }
+
+ @DataProvider(name = "configIgnoreUnknownFields")
+ public static Object[][] configIgnoreUnknownFields() {
+ return new Object[][]{
+ {false, FunctionDetails.ComponentType.SINK},
+ {true, FunctionDetails.ComponentType.SINK},
+ {false, FunctionDetails.ComponentType.SOURCE},
+ {true, FunctionDetails.ComponentType.SOURCE}
+ };
+ }
+
+ @Test(dataProvider = "configIgnoreUnknownFields")
+ public void testSinkConfigIgnoreUnknownFields(boolean
ignoreUnknownConfigFields,
+
FunctionDetails.ComponentType type) throws Exception {
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ final ConnectorDefinition connectorDefinition = new
ConnectorDefinition();
+ if (type == FunctionDetails.ComponentType.SINK) {
+
connectorDefinition.setSinkConfigClass(ConnectorTestConfig1.class.getName());
+ } else {
+
connectorDefinition.setSourceConfigClass(ConnectorTestConfig1.class.getName());
+ }
+
when(narClassLoader.getServiceDefinition(any())).thenReturn(ObjectMapperFactory
+ .getMapper().writer().writeValueAsString(connectorDefinition));
+ final InstanceConfig instanceConfig = new InstanceConfig();
+ instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
+
+ final Map<String, Object> parsedConfig =
JavaInstanceRunnable.parseComponentConfig(
+ "{\"field1\": \"value\", \"field2\": \"value2\"}",
+ instanceConfig,
+ narClassLoader,
+ type
+ );
+ if (ignoreUnknownConfigFields) {
+ Assert.assertEquals(parsedConfig.size(), 1);
+ Assert.assertEquals(parsedConfig.get("field1"), "value");
+ } else {
+ Assert.assertEquals(parsedConfig.size(), 2);
+ Assert.assertEquals(parsedConfig.get("field1"), "value");
+ Assert.assertEquals(parsedConfig.get("field2"), "value2");
+ }
+ }
+
+ public static class ConnectorTestConfig2 {
+ public static int constantField = 1;
+ public String field1;
+ private long withGetter;
+ @JsonIgnore
+ private ConnectorTestConfig1 ignore;
+
+ public long getWithGetter() {
+ return withGetter;
+ }
+ }
+
+ @Test
+ public void testBeanPropertiesReader() throws Exception {
+ final List<String> beanProperties =
JavaInstanceRunnable.BeanPropertiesReader
+ .getBeanProperties(ConnectorTestConfig2.class);
+ Assert.assertEquals(new TreeSet<>(beanProperties), new
TreeSet<>(Arrays.asList("field1", "withGetter")));
+ }
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index deff690815d..c4f44be3df3 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -150,6 +150,12 @@ public class JavaInstanceStarter implements AutoCloseable {
+ "exposed to function context, default is disabled.", required =
false)
public Boolean exposePulsarAdminClientEnabled = false;
+ @Parameter(names = "--ignore_unknown_config_fields",
+ description = "Whether to ignore unknown properties when
deserializing the connector configuration.",
+ required = false)
+ public Boolean ignoreUnknownConfigFields = false;
+
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -177,6 +183,7 @@ public class JavaInstanceStarter implements AutoCloseable {
instanceConfig.setClusterName(clusterName);
instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
instanceConfig.setExposePulsarAdminClientEnabled(exposePulsarAdminClientEnabled);
+ instanceConfig.setIgnoreUnknownConfigFields(ignoreUnknownConfigFields);
Function.FunctionDetails.Builder functionDetailsBuilder =
Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 53ebfcbfaf0..5392697e928 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -435,10 +435,14 @@ public class RuntimeUtils {
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));
- // only the Java instance supports --pending_async_requests right now.
+ // params supported only by the Java instance runtime.
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("--pending_async_requests");
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
+
+ if (instanceConfig.isIgnoreUnknownConfigFields()) {
+ args.add("--ignore_unknown_config_fields");
+ }
}
// state storage configs
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 3b8ddf774d1..0ed73953d7a 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -738,6 +738,17 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
)
private List<String> additionalJavaRuntimeArguments = new ArrayList<>();
+ @FieldContext(
+ category = CATEGORY_CONNECTORS,
+ doc = "Whether to ignore unknown properties when deserializing the
connector configuration. "
+ + "After upgrading a connector to a new version with a new
configuration, "
+ + "the new configuration may not be compatible with the
old connector. "
+ + "In case of rollback, it's required to also rollback the
connector configuration. "
+ + "Ignoring unknown fields makes possible to keep the new
configuration and "
+ + "only rollback the connector."
+ )
+ private boolean ignoreUnknownConfigFields = false;
+
public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace,
functionMetadataTopicName);
}
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index c587a8a7348..03c6eb79218 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -221,6 +221,7 @@ public class FunctionActioner {
if (workerConfig.getAdditionalJavaRuntimeArguments() != null) {
instanceConfig.setAdditionalJavaRuntimeArguments(workerConfig.getAdditionalJavaRuntimeArguments());
}
+
instanceConfig.setIgnoreUnknownConfigFields(workerConfig.isIgnoreUnknownConfigFields());
return instanceConfig;
}
diff --git
a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
index 02314241f28..2fc04034f65 100644
--- a/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/alluxio/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -18,4 +18,5 @@
#
name: alluxio
description: Writes data into Alluxio
-sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
\ No newline at end of file
+sinkClass: org.apache.pulsar.io.alluxio.sink.AlluxioSink
+sinkConfigClass: org.apache.pulsar.io.alluxio.sink.AlluxioSinkConfig
\ No newline at end of file
diff --git
a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
index dd6ba976adb..50cad346b3e 100644
--- a/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/dynamodb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: dynamodb
description: DynamoDB connectors
sourceClass: org.apache.pulsar.io.dynamodb.DynamoDBSource
+sourceConfigClass: org.apache.pulsar.io.dynamodb.DynamoDBSourceConfig
diff --git
a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
index 1b45638dd4d..4b80907ee86 100644
---
a/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
+++
b/pulsar-io/jdbc/clickhouse/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: jdbc-clickhouse
description: JDBC sink for ClickHouse
sinkClass: org.apache.pulsar.io.jdbc.ClickHouseJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git
a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
index 463c8cba162..81dd7277e1f 100644
--- a/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/mariadb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: jdbc-mariadb
description: JDBC sink for MariaDB
sinkClass: org.apache.pulsar.io.jdbc.MariadbJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git
a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
index f6262df6997..38c120f0893 100644
---
a/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
+++
b/pulsar-io/jdbc/openmldb/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: jdbc-openmldb
description: JDBC sink for OpenMLDB
sinkClass: org.apache.pulsar.io.jdbc.OpenMLDBJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
diff --git
a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
index a8d192a4568..282ac8a0230 100644
---
a/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
+++
b/pulsar-io/jdbc/postgres/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: jdbc-postgres
description: JDBC sink for PostgreSQL
sinkClass: org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink
+sinkConfigClass: org.apache.pulsar.io.jdbc.JdbcSinkConfig
\ No newline at end of file