This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fef6262 [AURON #2059] Introduce Auron Native Kafka 
DynamicTableFactory (#2078)
4fef6262 is described below

commit 4fef6262f416bef6e311151f7e2d69fce7364e2b
Author: zhangmang <[email protected]>
AuthorDate: Mon Mar 9 19:02:45 2026 +0800

    [AURON #2059] Introduce Auron Native Kafka DynamicTableFactory (#2078)
    
    # Which issue does this PR close?
    
    Closes #2059
    
    # Rationale for this change
    * Introduce Auron Kafka DynamicTableFactory
    
    # What changes are included in this PR?
    * add AuronKafkaDynamicTableFactory
    * add AuronKafkaDynamicTableSource
    * KafkaConstants
    
    # Are there any user-facing changes?
    * No
    
    # How was this patch tested?
    * test vim UT: AuronKafkaDynamicTableFactoryTest
---
 .../kafka/AuronKafkaDynamicTableFactory.java       | 178 ++++++++++++++++
 .../kafka/AuronKafkaDynamicTableSource.java        |  81 +++++++
 .../flink/connector/kafka/KafkaConstants.java      | 235 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  17 ++
 .../kafka/AuronKafkaDynamicTableFactoryTest.java   |  41 ++++
 .../org.apache.flink.table.factories.Factory       |  17 ++
 6 files changed, 569 insertions(+)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
new file mode 100644
index 00000000..ce9c09a3
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.apache.auron.flink.connector.kafka.KafkaConstants.*;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DynamicTableSourceFactory} for creating configured instances of 
{@link AuronKafkaDynamicTableSource}.
+ */
+public class AuronKafkaDynamicTableFactory implements 
DynamicTableSourceFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AuronKafkaDynamicTableFactory.class);
+    public static final String IDENTIFIER = "auron-kafka";
+    private static final String PROPERTIES_PREFIX = "properties.";
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = 
ConfigOptions.key("properties.bootstrap.servers")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Required Kafka server connection string");
+    public static final ConfigOption<String> TOPIC = ConfigOptions.key("topic")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The topic to read from or write to.");
+    public static final ConfigOption<String> PROPS_GROUP_ID = 
ConfigOptions.key("properties.group.id")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Required consumer group in Kafka consumer, no 
need for Kafka producer");
+    public static final ConfigOption<String> PB_DESC_FILE = 
ConfigOptions.key("pb.desc.filename")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The filename of the pb descriptor file.");
+
+    public static final ConfigOption<String> PB_ROOT_MESSAGE_NAME = 
ConfigOptions.key("pb.root.message")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The root message name of the pb.");
+
+    public static final ConfigOption<Integer> BUFFER_SIZE = 
ConfigOptions.key("buffer.size")
+            .intType()
+            .defaultValue(3000)
+            .withDescription("kafka message records buffer size.");
+
+    public static final ConfigOption<String> NESTED_COLS_FIELD_MAPPING = 
ConfigOptions.key("nested.cols.field.mapping")
+            .stringType()
+            .defaultValue("{}")
+            .withDescription(
+                    "When certain fields within complex nested structures need 
to be used at the top level of a Flink Table, this configuration must be 
specified."
+                            + "The JSON key corresponds to the field name in 
the Flink table, while the value represents nested fields using the a.b.c 
notation.");
+
+    public static final ConfigOption<String> PB_SKIP_FIELDS = 
ConfigOptions.key("pb.skip.fields")
+            .stringType()
+            .defaultValue("")
+            .withDescription("Protobuf fields to skip when deserializing. The 
format is: field1,field2,field3");
+
+    public static final ConfigOption<String> START_UP_MODE = 
ConfigOptions.key("start-up.mode")
+            .stringType()
+            .defaultValue("GROUP_OFFSET")
+            .withDescription(
+                    "offset mode for kafka source, support GROUP_OFFSET, 
LATEST, EARLIEST, TIMESTAMP will be supported.");
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        final ReadableConfig tableOptions = helper.getOptions();
+        try {
+            String kafkaPropertiesJson = mapper.writeValueAsString(
+                    
getKafkaProperties(context.getCatalogTable().getOptions()));
+            Map<String, String> formatConfig = new HashMap<>();
+            String format = tableOptions.getOptional(FactoryUtil.FORMAT).get();
+            formatConfig.put(KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD, 
tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+            if (KAFKA_FORMAT_PROTOBUF.equalsIgnoreCase(format)) {
+                formatConfig.put(KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD, 
tableOptions.get(PB_DESC_FILE));
+                formatConfig.put(KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD, 
tableOptions.get(PB_ROOT_MESSAGE_NAME));
+                formatConfig.put(KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD, 
tableOptions.get(PB_SKIP_FIELDS));
+            }
+            String formatConfigJson = mapper.writeValueAsString(formatConfig);
+            return new AuronKafkaDynamicTableSource(
+                    
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
+                    tableOptions.get(TOPIC),
+                    kafkaPropertiesJson,
+                    format,
+                    formatConfigJson,
+                    tableOptions.get(BUFFER_SIZE),
+                    tableOptions.get(START_UP_MODE),
+                    tableOptions.get(NESTED_COLS_FIELD_MAPPING));
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Could not create Auron Kafka 
dynamic table source", e);
+        }
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROPS_BOOTSTRAP_SERVERS);
+        options.add(PROPS_GROUP_ID);
+        options.add(TOPIC);
+        options.add(FactoryUtil.FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FactoryUtil.FORMAT);
+        options.add(TOPIC);
+        options.add(PROPS_GROUP_ID);
+        options.add(PROPS_BOOTSTRAP_SERVERS);
+        options.add(PB_DESC_FILE);
+        options.add(PB_ROOT_MESSAGE_NAME);
+        options.add(BUFFER_SIZE);
+        options.add(NESTED_COLS_FIELD_MAPPING);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(PROPS_BOOTSTRAP_SERVERS, PROPS_GROUP_ID, 
TOPIC).collect(Collectors.toSet());
+    }
+
+    public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
+        final Properties kafkaProperties = new Properties();
+
+        if (hasKafkaClientProperties(tableOptions)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+                    .forEach(key -> {
+                        final String value = tableOptions.get(key);
+                        final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
+                        if (KAFKA_PROPERTIES_WHITE_LIST.contains(subKey)) {
+                            kafkaProperties.put(subKey, value);
+                        }
+                    });
+        }
+        return kafkaProperties;
+    }
+
+    /**
+     * Decides if the table options contains Kafka client properties that 
start with prefix
+     * 'properties'.
+     */
+    private static boolean hasKafkaClientProperties(Map<String, String> 
tableOptions) {
+        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
new file mode 100644
index 00000000..80d536b7
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableSource.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link DynamicTableSource} for Auron Kafka.
+ */
+public class AuronKafkaDynamicTableSource implements ScanTableSource {
+
+    private final DataType physicalDataType;
+    private final String kafkaTopic;
+    private final String kafkaPropertiesJson;
+    private final String format;
+    private final String formatConfigJson;
+    private final int bufferSize;
+    private final String startupMode;
+    private final String nestedColsMappingJson;
+
+    public AuronKafkaDynamicTableSource(
+            DataType physicalDataType,
+            String kafkaTopic,
+            String kafkaPropertiesJson,
+            String format,
+            String formatConfigJson,
+            int bufferSize,
+            String startupMode,
+            String nestedColsMappingJson) {
+        final LogicalType physicalType = physicalDataType.getLogicalType();
+        Preconditions.checkArgument(physicalType.is(LogicalTypeRoot.ROW), "Row 
data type expected.");
+        this.physicalDataType = physicalDataType;
+        this.kafkaTopic = kafkaTopic;
+        this.kafkaPropertiesJson = kafkaPropertiesJson;
+        this.format = format;
+        this.formatConfigJson = formatConfigJson;
+        this.bufferSize = bufferSize;
+        this.startupMode = startupMode;
+        this.nestedColsMappingJson = nestedColsMappingJson;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) 
{
+        return null;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return null;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Auron Kafka Dynamic Table Source";
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
new file mode 100644
index 00000000..d733ccbe
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/KafkaConstants.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utilities for Kafka.
+ */
+public class KafkaConstants {
+
+    /**
+     * The list of Kafka properties that are allowed to be passed to the 
consumer.
+     * rdkafka will validate properties. To prevent Auron from reporting 
errors, we will filter kafka properties.
+     */
+    public static final List<String> KAFKA_PROPERTIES_WHITE_LIST = 
Arrays.asList(
+            "builtin.features",
+            "client.id",
+            "metadata.broker.list",
+            "bootstrap.servers",
+            "message.max.bytes",
+            "message.copy.max.bytes",
+            "receive.message.max.bytes",
+            "max.in.flight.requests.per.connection",
+            "max.in.flight",
+            "metadata.recovery.strategy",
+            "metadata.recovery.rebootstrap.trigger.ms",
+            "topic.metadata.refresh.interval.ms",
+            "metadata.max.age.ms",
+            "topic.metadata.refresh.fast.interval.ms",
+            "topic.metadata.refresh.fast.cnt",
+            "topic.metadata.refresh.sparse",
+            "topic.metadata.propagation.max.ms",
+            "topic.blacklist",
+            "debug",
+            "socket.timeout.ms",
+            "socket.blocking.max.ms",
+            "socket.send.buffer.bytes",
+            "socket.receive.buffer.bytes",
+            "socket.keepalive.enable",
+            "socket.nagle.disable",
+            "socket.max.fails",
+            "broker.address.ttl",
+            "broker.address.family",
+            "socket.connection.setup.timeout.ms",
+            "connections.max.idle.ms",
+            "reconnect.backoff.jitter.ms",
+            "reconnect.backoff.ms",
+            "reconnect.backoff.max.ms",
+            "statistics.interval.ms",
+            "enabled_events",
+            "error_cb",
+            "throttle_cb",
+            "stats_cb",
+            "log_cb",
+            "log_level",
+            "log.queue",
+            "log.thread.name",
+            "enable.random.seed",
+            "log.connection.close",
+            "background_event_cb",
+            "socket_cb",
+            "connect_cb",
+            "closesocket_cb",
+            "open_cb",
+            "resolve_cb",
+            "opaque",
+            "default_topic_conf",
+            "internal.termination.signal",
+            "api.version.request",
+            "api.version.request.timeout.ms",
+            "api.version.fallback.ms",
+            "broker.version.fallback",
+            "allow.auto.create.topics",
+            "security.protocol",
+            "ssl.cipher.suites",
+            "ssl.curves.list",
+            "ssl.sigalgs.list",
+            "ssl.key.location",
+            "ssl.key.password",
+            "ssl.key.pem",
+            "ssl_key",
+            "ssl.certificate.location",
+            "ssl.certificate.pem",
+            "ssl_certificate",
+            "ssl.ca.location",
+            "https.ca.location",
+            "https.ca.pem",
+            "ssl.ca.pem",
+            "ssl_ca",
+            "ssl.ca.certificate.stores",
+            "ssl.crl.location",
+            "ssl.keystore.location",
+            "ssl.keystore.password",
+            "ssl.providers",
+            "ssl.engine.location",
+            "ssl.engine.id",
+            "ssl_engine_callback_data",
+            "enable.ssl.certificate.verification",
+            "ssl.endpoint.identification.algorithm",
+            "ssl.certificate.verify_cb",
+            "sasl.mechanisms",
+            "sasl.mechanism",
+            "sasl.kerberos.service.name",
+            "sasl.kerberos.principal",
+            "sasl.kerberos.kinit.cmd",
+            "sasl.kerberos.keytab",
+            "sasl.kerberos.min.time.before.relogin",
+            "sasl.username",
+            "sasl.password",
+            "sasl.oauthbearer.config",
+            "enable.sasl.oauthbearer.unsecure.jwt",
+            "oauthbearer_token_refresh_cb",
+            "sasl.oauthbearer.method",
+            "sasl.oauthbearer.client.id",
+            "sasl.oauthbearer.client.credentials.client.id",
+            "sasl.oauthbearer.client.credentials.client.secret",
+            "sasl.oauthbearer.client.secret",
+            "sasl.oauthbearer.scope",
+            "sasl.oauthbearer.extensions",
+            "sasl.oauthbearer.token.endpoint.url",
+            "sasl.oauthbearer.grant.type",
+            "sasl.oauthbearer.assertion.algorithm",
+            "sasl.oauthbearer.assertion.private.key.file",
+            "sasl.oauthbearer.assertion.private.key.passphrase",
+            "sasl.oauthbearer.assertion.private.key.pem",
+            "sasl.oauthbearer.assertion.file",
+            "sasl.oauthbearer.assertion.claim.aud",
+            "sasl.oauthbearer.assertion.claim.exp.seconds",
+            "sasl.oauthbearer.assertion.claim.iss",
+            "sasl.oauthbearer.assertion.claim.jti.include",
+            "sasl.oauthbearer.assertion.claim.nbf.seconds",
+            "sasl.oauthbearer.assertion.claim.sub",
+            "sasl.oauthbearer.assertion.jwt.template.file",
+            "sasl.oauthbearer.metadata.authentication.type",
+            "plugin.library.paths",
+            "interceptors",
+            "group.id",
+            "group.instance.id",
+            "partition.assignment.strategy",
+            "session.timeout.ms",
+            "heartbeat.interval.ms",
+            "group.protocol.type",
+            "group.protocol",
+            "group.remote.assignor",
+            "coordinator.query.interval.ms",
+            "max.poll.interval.ms",
+            "auto.commit.interval.ms",
+            "enable.auto.offset.store",
+            "queued.min.messages",
+            "queued.max.messages.kbytes",
+            "fetch.wait.max.ms",
+            "fetch.queue.backoff.ms",
+            "fetch.message.max.bytes",
+            "max.partition.fetch.bytes",
+            "fetch.max.bytes",
+            "fetch.min.bytes",
+            "fetch.error.backoff.ms",
+            "offset.store.method",
+            "isolation.level",
+            "consume_cb",
+            "rebalance_cb",
+            "offset_commit_cb",
+            "enable.partition.eof",
+            "check.crcs",
+            "client.rack",
+            "transactional.id",
+            "transaction.timeout.ms",
+            "enable.idempotence",
+            "enable.gapless.guarantee",
+            "queue.buffering.max.messages",
+            "queue.buffering.max.kbytes",
+            "queue.buffering.max.ms",
+            "linger.ms",
+            "message.send.max.retries",
+            "retries",
+            "retry.backoff.ms",
+            "retry.backoff.max.ms",
+            "queue.buffering.backpressure.threshold",
+            "compression.codec",
+            "compression.type",
+            "batch.num.messages",
+            "batch.size",
+            "delivery.report.only.error",
+            "dr_cb",
+            "dr_msg_cb",
+            "sticky.partitioning.linger.ms",
+            "client.dns.lookup",
+            "enable.metrics.push",
+            "request.required.acks",
+            "acks",
+            "request.timeout.ms",
+            "message.timeout.ms",
+            "delivery.timeout.ms",
+            "queuing.strategy",
+            "produce.offset.report",
+            "partitioner",
+            "partitioner_cb",
+            "msg_order_cmp",
+            "opaque",
+            "compression.codec",
+            "compression.type",
+            "compression.level",
+            "auto.commit.enable",
+            "enable.auto.commit",
+            "auto.commit.interval.ms",
+            "auto.offset.reset",
+            "offset.store.path",
+            "offset.store.sync.interval.ms",
+            "offset.store.method",
+            "consume.callback.max.messages");
+
+    public static final String KAFKA_FORMAT_PROTOBUF = "Protobuf";
+    public static final String KAFKA_FORMAT_JSON = "Json";
+
+    public static final String KAFKA_PB_FORMAT_PB_DESC_FILE_FIELD = 
"pb_desc_file";
+    public static final String KAFKA_PB_FORMAT_ROOT_MESSAGE_NAME_FIELD = 
"root_message_name";
+    public static final String KAFKA_PB_FORMAT_SKIP_FIELDS_FIELD = 
"skip_fields";
+    public static final String KAFKA_PB_FORMAT_NESTED_COL_MAPPING_FIELD = 
"nested_col_mapping";
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..2e075a36
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.auron.flink.connector.kafka.AuronKafkaDynamicTableFactory
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
new file mode 100644
index 00000000..b3004c95
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/connector/kafka/AuronKafkaDynamicTableFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.connector.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.junit.jupiter.api.Test;
+
+/**
+ * This class is used to test the AuronKafkaDynamicTableFactory.
+ */
+public class AuronKafkaDynamicTableFactoryTest {
+
+    @Test
+    public void testDiscoveryTableFactory() {
+        DynamicTableSourceFactory factory = FactoryUtil.discoverFactory(
+                Thread.currentThread().getContextClassLoader(),
+                DynamicTableSourceFactory.class,
+                AuronKafkaDynamicTableFactory.IDENTIFIER);
+        assertInstanceOf(
+                AuronKafkaDynamicTableFactory.class,
+                factory,
+                "The factory should be an instance of 
AuronKafkaDynamicTableFactory.");
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..2e075a36
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.auron.flink.connector.kafka.AuronKafkaDynamicTableFactory

Reply via email to