This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 4fef6262 [AURON #2059] Introduce Auron Native Kafka
DynamicTableFactory (#2078)
4fef6262 is described below
commit 4fef6262f416bef6e311151f7e2d69fce7364e2b
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 9 19:02:45 2026 +0800
[AURON #2059] Introduce Auron Native Kafka DynamicTableFactory (#2078)
# Which issue does this PR close?
Closes #2059
# Rationale for this change
* Introduce Auron Kafka DynamicTableFactory
# What changes are included in this PR?
* add AuronKafkaDynamicTableFactory
* add AuronKafkaDynamicTableSource
* KafkaConstants
# Are there any user-facing changes?
* No
# How was this patch tested?
* test vim UT: AuronKafkaDynamicTableFactoryTest
---
.../kafka/AuronKafkaDynamicTableFactory.java | 178 ++++++++++++++++
.../kafka/AuronKafkaDynamicTableSource.java | 81 +++++++
.../flink/connector/kafka/KafkaConstants.java | 235 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 17 ++
.../kafka/AuronKafkaDynamicTableFactoryTest.java | 41 ++++
.../org.apache.flink.table.factories.Factory | 17 ++
6 files changed, 569 insertions(+)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
new file mode 100644
index 00000000..ce9c09a3
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DynamicTableSourceFactory} for creating configured instances of
{@link AuronKafkaDynamicTableSource}.
+ */
+public class AuronKafkaDynamicTableFactory implements
DynamicTableSourceFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuronKafkaDynamicTableFactory.class);
+ public static final String IDENTIFIER = "auron-kafka";
+ private static final String PROPERTIES_PREFIX = "properties.";
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
ConfigOptions.key("properties.bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required Kafka server connection string");
+ public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The topic to read from or write to.");
+ public static final ConfigOption<String> PROPS_GROUP_ID =
ConfigOptions.key("properties.group.id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required consumer group in Kafka consumer, no
need for Kafka producer");
+ public static final ConfigOption<String> PB_DESC_FILE =
ConfigOptions.key("pb.desc.filename")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The filename of the pb descriptor file.");
+
+ public static final ConfigOption<String> PB_ROOT_MESSAGE_NAME =
ConfigOptions.key("pb.root.message")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The root message name of the pb.");
+
+ public static final ConfigOption<Integer> BUFFER_SIZE =
ConfigOptions.key("buffer.size")
+ .intType()
+ .defaultValue(3000)
+ .withDescription("kafka message records buffer size.");
+
+ public static final ConfigOption<String> NESTED_COLS_FIELD_MAPPING =
ConfigOptions.key("nested.cols.field.mapping")
+ .stringType()
+ .defaultValue("{}")
+ .withDescription(
+ "When certain fields within complex nested structures need
to be used at the top level of a Flink Table, this configuration must be
specified."
+ + "The JSON key corresponds to the field name in
the Flink table, while the value represents nested fields using the a.b.c
notation.");
+
+ public static final ConfigOption<String> PB_SKIP_FIELDS =
ConfigOptions.key("pb.skip.fields")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Protobuf fields to skip when deserializing. The
format is: field1,field2,field3");
+
+ public static final ConfigOption<String> START_UP_MODE =
ConfigOptions.key("start-up.mode")
+ .stringType()
+ .defaultValue("GROUP_OFFSET")
+ .withDescription(
+ "offset mode for kafka source, support GROUP_OFFSET,
LATEST, EARLIEST, TIMESTAMP will be supported.");
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ final ReadableConfig tableOptions = helper.getOptions();
+ try {
+ String kafkaPropertiesJson = mapper.writeValueAsString(
+
getKafkaProperties(context.getCatalogTable().getOptions()));
+ Map<String, String> formatConfig = new HashMap<>();
+ String format = tableOptions.getOptional(FactoryUtil.FORMAT).get();
+ formatConfig.put(KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD,
tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+ if (KAFKA_FORMAT_PROTOBUF.equalsIgnoreCase(format)) {
+ formatConfig.put(KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD,
tableOptions.get(PB_DESC_FILE));
+ formatConfig.put(KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD,
tableOptions.get(PB_ROOT_MESSAGE_NAME));
+ formatConfig.put(KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD,
tableOptions.get(PB_SKIP_FIELDS));
+ }
+ String formatConfigJson = mapper.writeValueAsString(formatConfig);
+ return new AuronKafkaDynamicTableSource(
+
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
+ tableOptions.get(TOPIC),
+ kafkaPropertiesJson,
+ format,
+ formatConfigJson,
+ tableOptions.get(BUFFER_SIZE),
+ tableOptions.get(START_UP_MODE),
+ tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Could not create Auron Kafka
dynamic table source", e);
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ options.add(PROPS_GROUP_ID);
+ options.add(TOPIC);
+ options.add(FactoryUtil.FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FactoryUtil.FORMAT);
+ options.add(TOPIC);
+ options.add(PROPS_GROUP_ID);
+ options.add(PROPS_BOOTSTRAP_SERVERS);
+ options.add(PB_DESC_FILE);
+ options.add(PB_ROOT_MESSAGE_NAME);
+ options.add(BUFFER_SIZE);
+ options.add(NESTED_COLS_FIELD_MAPPING);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> forwardOptions() {
+ return Stream.of(PROPS_BOOTSTRAP_SERVERS, PROPS_GROUP_ID,
TOPIC).collect(Collectors.toSet());
+ }
+
+ public static Properties getKafkaProperties(Map<String, String>
tableOptions) {
+ final Properties kafkaProperties = new Properties();
+
+ if (hasKafkaClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(key -> {
+ final String value = tableOptions.get(key);
+ final String subKey =
key.substring((PROPERTIES_PREFIX).length());
+ if (KAFKA_PROPERTIES_WHITE_LIST.contains(subKey)) {
+ kafkaProperties.put(subKey, value);
+ }
+ });
+ }
+ return kafkaProperties;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that
start with prefix
+ * 'properties'.
+ */
+ private static boolean hasKafkaClientProperties(Map<String, String>
tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k ->
k.startsWith(PROPERTIES_PREFIX));
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
new file mode 100644
index 00000000..80d536b7
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link DynamicTableSource} for Auron Kafka.
+ */
+public class AuronKafkaDynamicTableSource implements ScanTableSource {
+
+ private final DataType physicalDataType;
+ private final String kafkaTopic;
+ private final String kafkaPropertiesJson;
+ private final String format;
+ private final String formatConfigJson;
+ private final int bufferSize;
+ private final String startupMode;
+ private final String nestedColsMappingJson;
+
+ public AuronKafkaDynamicTableSource(
+ DataType physicalDataType,
+ String kafkaTopic,
+ String kafkaPropertiesJson,
+ String format,
+ String formatConfigJson,
+ int bufferSize,
+ String startupMode,
+ String nestedColsMappingJson) {
+ final LogicalType physicalType = physicalDataType.getLogicalType();
+ Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row
data type expected.");
+ this.physicalDataType = physicalDataType;
+ this.kafkaTopic = kafkaTopic;
+ this.kafkaPropertiesJson = kafkaPropertiesJson;
+ this.format = format;
+ this.formatConfigJson = formatConfigJson;
+ this.bufferSize = bufferSize;
+ this.startupMode = startupMode;
+ this.nestedColsMappingJson = nestedColsMappingJson;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ return null;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return null;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Auron Kafka Dynamic Table Source";
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
new file mode 100644
index 00000000..d733ccbe
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utilities for Kafka.
+ */
+public class KafkaConstants {
+
+ /**
+ * The list of Kafka properties that are allowed to be passed to the
consumer.
+ * rdkafka will validate properties. To prevent Auron from reporting
errors, we will filter kafka properties.
+ */
+ public static final List<String> KAFKA_PROPERTIES_WHITE_LIST =
Arrays.asList(
+ "builtin.features",
+ "client.id",
+ "metadata.broker.list",
+ "bootstrap.servers",
+ "message.max.bytes",
+ "message.copy.max.bytes",
+ "receive.message.max.bytes",
+ "max.in.flight.requests.per.connection",
+ "max.in.flight",
+ "metadata.recovery.strategy",
+ "metadata.recovery.rebootstrap.trigger.ms",
+ "topic.metadata.refresh.interval.ms",
+ "metadata.max.age.ms",
+ "topic.metadata.refresh.fast.interval.ms",
+ "topic.metadata.refresh.fast.cnt",
+ "topic.metadata.refresh.sparse",
+ "topic.metadata.propagation.max.ms",
+ "topic.blacklist",
+ "debug",
+ "socket.timeout.ms",
+ "socket.blocking.max.ms",
+ "socket.send.buffer.bytes",
+ "socket.receive.buffer.bytes",
+ "socket.keepalive.enable",
+ "socket.nagle.disable",
+ "socket.max.fails",
+ "broker.address.ttl",
+ "broker.address.family",
+ "socket.connection.setup.timeout.ms",
+ "connections.max.idle.ms",
+ "reconnect.backoff.jitter.ms",
+ "reconnect.backoff.ms",
+ "reconnect.backoff.max.ms",
+ "statistics.interval.ms",
+ "enabled_events",
+ "error_cb",
+ "throttle_cb",
+ "stats_cb",
+ "log_cb",
+ "log_level",
+ "log.queue",
+ "log.thread.name",
+ "enable.random.seed",
+ "log.connection.close",
+ "background_event_cb",
+ "socket_cb",
+ "connect_cb",
+ "closesocket_cb",
+ "open_cb",
+ "resolve_cb",
+ "opaque",
+ "default_topic_conf",
+ "internal.termination.signal",
+ "api.version.request",
+ "api.version.request.timeout.ms",
+ "api.version.fallback.ms",
+ "broker.version.fallback",
+ "allow.auto.create.topics",
+ "security.protocol",
+ "ssl.cipher.suites",
+ "ssl.curves.list",
+ "ssl.sigalgs.list",
+ "ssl.key.location",
+ "ssl.key.password",
+ "ssl.key.pem",
+ "ssl_key",
+ "ssl.certificate.location",
+ "ssl.certificate.pem",
+ "ssl_certificate",
+ "ssl.ca.location",
+ "https.ca.location",
+ "https.ca.pem",
+ "ssl.ca.pem",
+ "ssl_ca",
+ "ssl.ca.certificate.stores",
+ "ssl.crl.location",
+ "ssl.keystore.location",
+ "ssl.keystore.password",
+ "ssl.providers",
+ "ssl.engine.location",
+ "ssl.engine.id",
+ "ssl_engine_callback_data",
+ "enable.ssl.certificate.verification",
+ "ssl.endpoint.identification.algorithm",
+ "ssl.certificate.verify_cb",
+ "sasl.mechanisms",
+ "sasl.mechanism",
+ "sasl.kerberos.service.name",
+ "sasl.kerberos.principal",
+ "sasl.kerberos.kinit.cmd",
+ "sasl.kerberos.keytab",
+ "sasl.kerberos.min.time.before.relogin",
+ "sasl.username",
+ "sasl.password",
+ "sasl.oauthbearer.config",
+ "enable.sasl.oauthbearer.unsecure.jwt",
+ "oauthbearer_token_refresh_cb",
+ "sasl.oauthbearer.method",
+ "sasl.oauthbearer.client.id",
+ "sasl.oauthbearer.client.credentials.client.id",
+ "sasl.oauthbearer.client.credentials.client.secret",
+ "sasl.oauthbearer.client.secret",
+ "sasl.oauthbearer.scope",
+ "sasl.oauthbearer.extensions",
+ "sasl.oauthbearer.token.endpoint.url",
+ "sasl.oauthbearer.grant.type",
+ "sasl.oauthbearer.assertion.algorithm",
+ "sasl.oauthbearer.assertion.private.key.file",
+ "sasl.oauthbearer.assertion.private.key.passphrase",
+ "sasl.oauthbearer.assertion.private.key.pem",
+ "sasl.oauthbearer.assertion.file",
+ "sasl.oauthbearer.assertion.claim.aud",
+ "sasl.oauthbearer.assertion.claim.exp.seconds",
+ "sasl.oauthbearer.assertion.claim.iss",
+ "sasl.oauthbearer.assertion.claim.jti.include",
+ "sasl.oauthbearer.assertion.claim.nbf.seconds",
+ "sasl.oauthbearer.assertion.claim.sub",
+ "sasl.oauthbearer.assertion.jwt.template.file",
+ "sasl.oauthbearer.metadata.authentication.type",
+ "plugin.library.paths",
+ "interceptors",
+ "group.id",
+ "group.instance.id",
+ "partition.assignment.strategy",
+ "session.timeout.ms",
+ "heartbeat.interval.ms",
+ "group.protocol.type",
+ "group.protocol",
+ "group.remote.assignor",
+ "coordinator.query.interval.ms",
+ "max.poll.interval.ms",
+ "auto.commit.interval.ms",
+ "enable.auto.offset.store",
+ "queued.min.messages",
+ "queued.max.messages.kbytes",
+ "fetch.wait.max.ms",
+ "fetch.queue.backoff.ms",
+ "fetch.message.max.bytes",
+ "max.partition.fetch.bytes",
+ "fetch.max.bytes",
+ "fetch.min.bytes",
+ "fetch.error.backoff.ms",
+ "offset.store.method",
+ "isolation.level",
+ "consume_cb",
+ "rebalance_cb",
+ "offset_commit_cb",
+ "enable.partition.eof",
+ "check.crcs",
+ "client.rack",
+ "transactional.id",
+ "transaction.timeout.ms",
+ "enable.idempotence",
+ "enable.gapless.guarantee",
+ "queue.buffering.max.messages",
+ "queue.buffering.max.kbytes",
+ "queue.buffering.max.ms",
+ "linger.ms",
+ "message.send.max.retries",
+ "retries",
+ "retry.backoff.ms",
+ "retry.backoff.max.ms",
+ "queue.buffering.backpressure.threshold",
+ "compression.codec",
+ "compression.type",
+ "batch.num.messages",
+ "batch.size",
+ "delivery.report.only.error",
+ "dr_cb",
+ "dr_msg_cb",
+ "sticky.partitioning.linger.ms",
+ "client.dns.lookup",
+ "enable.metrics.push",
+ "request.required.acks",
+ "acks",
+ "request.timeout.ms",
+ "message.timeout.ms",
+ "delivery.timeout.ms",
+ "queuing.strategy",
+ "produce.offset.report",
+ "partitioner",
+ "partitioner_cb",
+ "msg_order_cmp",
+ "opaque",
+ "compression.codec",
+ "compression.type",
+ "compression.level",
+ "auto.commit.enable",
+ "enable.auto.commit",
+ "auto.commit.interval.ms",
+ "auto.offset.reset",
+ "offset.store.path",
+ "offset.store.sync.interval.ms",
+ "offset.store.method",
+ "consume.callback.max.messages");
+
+ public static final String KAFKA_FORMAT_PROTOBUF = "Protobuf";
+ public static final String KAFKA_FORMAT_JSON = "Json";
+
+ public static final String KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD =
"pb_desc_file";
+ public static final String KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD =
"root_message_name";
+ public static final String KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD =
"skip_fields";
+ public static final String KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD =
"nested_col_mapping";
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..2e075a36
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.auron.flink.connector.kafka.AuronKafkaDynamicTableFactory
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
new file mode 100644
index 00000000..b3004c95
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.junit.jupiter.api.Test;
+
+/**
+ * This class is used to test the AuronKafkaDynamicTableFactory.
+ */
+public class AuronKafkaDynamicTableFactoryTest {
+
+ @Test
+ public void testDiscoveryTableFactory() {
+ DynamicTableSourceFactory factory = FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ DynamicTableSourceFactory.class,
+ AuronKafkaDynamicTableFactory.IDENTIFIER);
+ assertInstanceOf(
+ AuronKafkaDynamicTableFactory.class,
+ factory,
+ "The factory should be an instance of
AuronKafkaDynamicTableFactory.");
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..2e075a36
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.auron.flink.connector.kafka.AuronKafkaDynamicTableFactory