This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a5795d204 [flink][refactor] Move Kafka related code into 
paimon-flink-cdc module (#1988)
a5795d204 is described below

commit a5795d204e823864bf9ef7d3a48da87ec85005c7
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 12 16:51:10 2023 +0800

    [flink][refactor] Move Kafka related code into paimon-flink-cdc module 
(#1988)
---
 paimon-docs/pom.xml                                |   7 ++
 .../configuration/ConfigOptionsDocGenerator.java   |   2 +-
 paimon-flink/paimon-flink-cdc/pom.xml              |  95 +++++++++++----
 .../flink/kafka/KafkaLogDeserializationSchema.java |   0
 .../apache/paimon/flink/kafka/KafkaLogOptions.java |   0
 .../flink/kafka/KafkaLogSerializationSchema.java   |   0
 .../paimon/flink/kafka/KafkaLogSinkProvider.java   |   0
 .../paimon/flink/kafka/KafkaLogSourceProvider.java |   0
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |   0
 .../paimon/flink/kafka/KafkaLogStoreRegister.java  |   0
 .../paimon/flink/kafka/KafkaSinkFunction.java      |   0
 .../services/org.apache.paimon.factories.Factory   |   2 +
 ...ndMultiPartitionedTableWithKafkaLogITCase.java} |   7 +-
 .../ComputedColumnAndWatermarkTableITCase.java     |   8 +-
 .../flink/kafka/KafkaLogSerializationTest.java     |   0
 .../flink/kafka/KafkaLogStoreFactoryTest.java      | 128 +++++++++++++++++++++
 .../flink/kafka/KafkaLogStoreRegisterITCase.java   |   6 +-
 .../paimon/flink/kafka/KafkaLogTestUtils.java      |  35 ++++++
 .../paimon/flink/kafka/KafkaTableTestBase.java     |   0
 .../paimon/flink/kafka}/LogSystemITCase.java       |  60 +++++++++-
 .../StreamingReadWriteTableWithKafkaLogITCase.java |   5 +-
 .../flink/kafka}/StreamingWarehouseITCase.java     |   3 +-
 .../flink/source/LogHybridSourceFactoryTest.java   |   0
 paimon-flink/paimon-flink-common/pom.xml           |  22 ----
 .../paimon/flink/AbstractFlinkTableFactory.java    |   2 +-
 .../services/org.apache.paimon.factories.Factory   |   2 -
 .../flink/AbstractFlinkTableFactoryTest.java       |  53 ---------
 .../org/apache/paimon/flink/ChangelogModeTest.java |  23 +---
 .../paimon/flink/FileSystemCatalogITCase.java      |  77 +++----------
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |  40 +------
 30 files changed, 338 insertions(+), 239 deletions(-)

diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index 962ddfcf7..8cdb2f078 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -50,6 +50,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-cdc</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-spark-common</artifactId>
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 83c883f9c..a8b9cc0bf 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,7 @@ public class ConfigOptionsDocGenerator {
                 new OptionsClassLocation(
                         "paimon-flink/paimon-flink-common", 
"org.apache.paimon.flink"),
                 new OptionsClassLocation(
-                        "paimon-flink/paimon-flink-common", 
"org.apache.paimon.flink.kafka"),
+                        "paimon-flink/paimon-flink-cdc", 
"org.apache.paimon.flink.kafka"),
                 new OptionsClassLocation(
                         "paimon-hive/paimon-hive-catalog", 
"org.apache.paimon.hive"),
                 new OptionsClassLocation(
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml 
b/paimon-flink/paimon-flink-cdc/pom.xml
index 0c6b5b34a..d8e1591d2 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -79,6 +79,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- CDC dependencies -->
 
         <dependency>
@@ -173,6 +180,49 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
@@ -218,31 +268,34 @@ under the License.
             <version>${mongodb.testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-test-utils</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.junit.vintage</groupId>
-                    <artifactId>junit-vintage-engine</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
     </dependencies>
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <!-- Same as flink-sql-connector-kafka. -->
+                                <relocation>
+                                    <pattern>org.apache.kafka</pattern>
+                                    
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3e165cf5f..1e3a2ff65 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+
 ### cdc action factories
 org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableActionFactory
 org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
index 0c09bd2a4..770757258 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
 
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.types.Row;
@@ -31,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static 
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
@@ -38,7 +38,6 @@ import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkFileStorePath;
-import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
@@ -53,7 +52,7 @@ import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
  * IT cases of streaming reading and writing tables which have composite 
primary keys and multiple
  * partition fields with Kafka log.
  */
-public class CompositePkAndMultiPartitionedTableWIthKafkaLogITCase extends 
KafkaTableTestBase {
+public class CompositePkAndMultiPartitionedTableWithKafkaLogITCase extends 
KafkaTableTestBase {
 
     @BeforeEach
     public void setUp() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
index 9cb99297f..32c6d626d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
+package org.apache.paimon.flink.kafka;
 
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.BeforeEach;
@@ -31,12 +29,12 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static 
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTable;
-import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertIntoFromTable;
@@ -48,7 +46,7 @@ import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingR
 public class ComputedColumnAndWatermarkTableITCase extends KafkaTableTestBase {
 
     @BeforeEach
-    public void setUp() throws Exception {
+    public void setUp() {
         init(createAndRegisterTempFile("").toString());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
new file mode 100644
index 000000000..0b127491a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.AbstractFlinkTableFactory;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.flink.sink.FlinkTableSink;
+import org.apache.paimon.flink.source.DataTableSource;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT about {@link KafkaLogStoreFactory}. */
+public class KafkaLogStoreFactoryTest {
+
+    @ParameterizedTest
+    @EnumSource(CoreOptions.StartupMode.class)
+    public void testCreateKafkaLogStoreFactory(CoreOptions.StartupMode 
startupMode) {
+        Map<String, String> dynamicOptions = new HashMap<>();
+        dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
+        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+        if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT
+                || startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+            dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
+        } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+            dynamicOptions.put(
+                    SCAN_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
+        }
+        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+        DynamicTableFactory.Context context =
+                KafkaLogTestUtils.testContext(
+                        "table",
+                        "",
+                        CoreOptions.LogChangelogMode.AUTO,
+                        CoreOptions.LogConsistency.TRANSACTIONAL,
+                        RowType.of(new IntType(), new IntType()),
+                        new int[] {0},
+                        dynamicOptions);
+
+        try {
+            Optional<LogStoreTableFactory> optional =
+                    
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
+            assertThat(startupMode)
+                    .isNotIn(
+                            CoreOptions.StartupMode.FROM_SNAPSHOT,
+                            CoreOptions.StartupMode.FROM_SNAPSHOT_FULL);
+            assertThat(optional.isPresent()).isTrue();
+            
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
+        } catch (ValidationException e) {
+            assertThat(startupMode)
+                    .isIn(
+                            CoreOptions.StartupMode.FROM_SNAPSHOT,
+                            CoreOptions.StartupMode.FROM_SNAPSHOT_FULL);
+        }
+    }
+
+    @Test
+    public void testInputChangelogProducerWithKafkaLog(@TempDir 
java.nio.file.Path temp)
+            throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+
+        Path path = new Path(temp.toUri().toString());
+        new SchemaManager(LocalFileIO.create(), path)
+                .createTable(
+                        new Schema(
+                                org.apache.paimon.types.RowType.of(
+                                                new 
org.apache.paimon.types.IntType(),
+                                                new 
org.apache.paimon.types.IntType())
+                                        .getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("f0"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
+
+        ObjectIdentifier identifier = ObjectIdentifier.of("c", "d", "t");
+        DataTableSource source =
+                new DataTableSource(identifier, table, true, null, new 
KafkaLogStoreFactory());
+        
assertThat(source.getChangelogMode()).isEqualTo(ChangelogMode.upsert());
+
+        FlinkTableSink sink = new FlinkTableSink(identifier, table, null, 
null);
+        
assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(ChangelogMode.all());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 08e182464..30e7d1de1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -24,7 +24,7 @@ import org.apache.paimon.options.Options;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.junit.After;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -48,7 +48,7 @@ public class KafkaLogStoreRegisterITCase extends 
KafkaTableTestBase {
 
     private static final String TABLE = "mock_table";
 
-    @After
+    @AfterEach
     public void tearDown() {
         // clean up all the topics
         try (AdminClient admin = createAdminClient()) {
@@ -157,7 +157,7 @@ public class KafkaLogStoreRegisterITCase extends 
KafkaTableTestBase {
         tableOptions.set(BOOTSTRAP_SERVERS, bootstrapServers);
 
         if (topic != null) {
-            tableOptions.set(KafkaLogOptions.TOPIC, topic);
+            tableOptions.set(TOPIC, topic);
         }
 
         if (partition != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
similarity index 85%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index 583c69e77..b7f9ae91b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.kafka;
 
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.LogConsistency;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.table.sink.SinkRecord;
@@ -56,11 +57,17 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
 import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static 
org.apache.paimon.flink.kafka.KafkaTableTestBase.createTopicIfNotExists;
+import static 
org.apache.paimon.flink.kafka.KafkaTableTestBase.getBootstrapServers;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTable;
 import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
 
 /** Utils for the test of {@link KafkaLogStoreFactory}. */
@@ -206,4 +213,32 @@ public class KafkaLogTestUtils {
                 hasPk ? row(pk) : EMPTY_ROW,
                 GenericRow.ofKind(rowKind, pk, value));
     }
+
+    static String createTableWithKafkaLog(
+            List<String> fieldsSpec,
+            List<String> primaryKeys,
+            List<String> partitionKeys,
+            boolean manuallyCreateLogTable) {
+        String topic = "topic_" + UUID.randomUUID();
+        String table =
+                createTable(
+                        fieldsSpec,
+                        primaryKeys,
+                        partitionKeys,
+                        new HashMap<String, String>() {
+                            {
+                                put(LOG_SYSTEM.key(), "kafka");
+                                put(BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
+                                put(TOPIC.key(), topic);
+                                put(DYNAMIC_PARTITION_OVERWRITE.key(), 
"false");
+                                put(WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
+                            }
+                        });
+
+        if (manuallyCreateLogTable) {
+            createTopicIfNotExists(topic, 1);
+        }
+
+        return table;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
similarity index 86%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
index 201f3db5f..2b4a89f37 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
 
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.table.api.TableResult;
@@ -33,6 +32,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -321,4 +321,60 @@ public class LogSystemITCase extends KafkaTableTestBase {
 
         checkTopicExists("T", 2, 1);
     }
+
+    @Test
+    public void testLogWriteRead() throws Exception {
+        String topic = UUID.randomUUID().toString();
+
+        try {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T (a STRING, b STRING, c STRING) 
WITH ("
+                                    + "'log.system'='kafka', "
+                                    + "'kafka.bootstrap.servers'='%s',"
+                                    + "'kafka.topic'='%s'"
+                                    + ")",
+                            getBootstrapServers(), topic));
+
+            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
+            BlockingIterator<Row, Row> iterator =
+                    BlockingIterator.of(tEnv.from("T").execute().collect());
+            List<Row> result = iterator.collectAndClose(2);
+            assertThat(result)
+                    .containsExactlyInAnyOrder(Row.of("1", "2", "3"), 
Row.of("4", "5", "6"));
+        } finally {
+            deleteTopicIfExists(topic);
+        }
+    }
+
+    @Test
+    public void testLogWriteReadWithVirtual() throws Exception {
+        String topic = UUID.randomUUID().toString();
+        createTopicIfNotExists(topic, 1);
+
+        try {
+            tEnv.executeSql(
+                    String.format(
+                            "CREATE TABLE T ("
+                                    + "a STRING, "
+                                    + "b STRING, "
+                                    + "c STRING, "
+                                    + "d AS CAST(c as INT) + 1"
+                                    + ") WITH ("
+                                    + "'log.system'='kafka', "
+                                    + "'kafka.bootstrap.servers'='%s',"
+                                    + "'kafka.topic'='%s'"
+                                    + ")",
+                            getBootstrapServers(), topic));
+
+            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
+            BlockingIterator<Row, Row> iterator =
+                    BlockingIterator.of(tEnv.from("T").execute().collect());
+            List<Row> result = iterator.collectAndClose(2);
+            assertThat(result)
+                    .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4), 
Row.of("4", "5", "6", 7));
+        } finally {
+            deleteTopicIfExists(topic);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
index c028c13e6..fb9e7d08d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.types.Row;
@@ -35,13 +34,13 @@ import java.util.Map;
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static 
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkFileStorePath;
-import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
index 8b414379c..9c21f491c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
 
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.common.JobStatus;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
similarity index 100%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
rename to 
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index f94738206..87b985005 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -73,13 +73,6 @@ under the License.
             <version>${frocksdbjni.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-kafka</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
@@ -219,14 +212,6 @@ under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-kafka</artifactId>
-            <version>${flink.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -247,13 +232,6 @@ under the License.
                                     
<include>org.apache.paimon:paimon-bundle</include>
                                 </includes>
                             </artifactSet>
-                            <relocations>
-                                <!-- Same as flink-sql-connector-kafka. -->
-                                <relocation>
-                                    <pattern>org.apache.kafka</pattern>
-                                    
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
-                                </relocation>
-                            </relocations>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index a9d72f003..31acd85b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -157,7 +157,7 @@ public abstract class AbstractFlinkTableFactory
 
     // ~ Tools 
------------------------------------------------------------------
 
-    static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+    public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
             DynamicTableFactory.Context context) {
         return createOptionalLogStoreFactory(
                 context.getClassLoader(), 
context.getCatalogTable().getOptions());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1e014e785..847233822 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
-
 ### action factories
 org.apache.paimon.flink.action.CompactActionFactory
 org.apache.paimon.flink.action.CompactDatabaseActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index b2e829dbd..41958a016 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,31 +18,14 @@
 
 package org.apache.paimon.flink;
 
-import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
-import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
 
-import static org.apache.paimon.CoreOptions.LogChangelogMode;
-import static org.apache.paimon.CoreOptions.LogConsistency;
-import static org.apache.paimon.CoreOptions.SCAN_MODE;
-import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
-import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
-import static org.apache.paimon.CoreOptions.StartupMode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link AbstractFlinkTableFactory}. */
@@ -70,42 +53,6 @@ public class AbstractFlinkTableFactoryTest {
                 true);
     }
 
-    @ParameterizedTest
-    @EnumSource(StartupMode.class)
-    public void testCreateKafkaLogStoreFactory(StartupMode startupMode) {
-        Map<String, String> dynamicOptions = new HashMap<>();
-        dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
-        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
-        if (startupMode == StartupMode.FROM_SNAPSHOT
-                || startupMode == StartupMode.FROM_SNAPSHOT_FULL) {
-            dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
-        } else if (startupMode == StartupMode.FROM_TIMESTAMP) {
-            dynamicOptions.put(
-                    SCAN_TIMESTAMP_MILLIS.key(), 
String.valueOf(System.currentTimeMillis()));
-        }
-        dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
-        DynamicTableFactory.Context context =
-                KafkaLogTestUtils.testContext(
-                        "table",
-                        "",
-                        LogChangelogMode.AUTO,
-                        LogConsistency.TRANSACTIONAL,
-                        RowType.of(new IntType(), new IntType()),
-                        new int[] {0},
-                        dynamicOptions);
-
-        try {
-            Optional<LogStoreTableFactory> optional =
-                    
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
-            assertThat(startupMode)
-                    .isNotIn(StartupMode.FROM_SNAPSHOT, 
StartupMode.FROM_SNAPSHOT_FULL);
-            assertThat(optional.isPresent()).isTrue();
-            
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
-        } catch (ValidationException e) {
-            assertThat(startupMode).isIn(StartupMode.FROM_SNAPSHOT, 
StartupMode.FROM_SNAPSHOT_FULL);
-        }
-    }
-
     private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
         assertThat(AbstractFlinkTableFactory.schemaEquals(r1, 
r2)).isEqualTo(expectEquals);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
index 71117b413..9022bbb50 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
@@ -19,8 +19,6 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.source.DataTableSource;
 import org.apache.paimon.fs.Path;
@@ -39,8 +37,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -61,15 +57,6 @@ public class ChangelogModeTest {
 
     private void test(Options options, ChangelogMode expectSource, 
ChangelogMode expectSink)
             throws Exception {
-        test(options, expectSource, expectSink, null);
-    }
-
-    private void test(
-            Options options,
-            ChangelogMode expectSource,
-            ChangelogMode expectSink,
-            @Nullable LogStoreTableFactory logStoreTableFactory)
-            throws Exception {
         new SchemaManager(LocalFileIO.create(), path)
                 .createTable(
                         new Schema(
@@ -80,8 +67,7 @@ public class ChangelogModeTest {
                                 ""));
         FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), path);
 
-        DataTableSource source =
-                new DataTableSource(identifier, table, true, null, 
logStoreTableFactory);
+        DataTableSource source = new DataTableSource(identifier, table, true, 
null, null);
         assertThat(source.getChangelogMode()).isEqualTo(expectSource);
 
         FlinkTableSink sink = new FlinkTableSink(identifier, table, null, 
null);
@@ -106,11 +92,4 @@ public class ChangelogModeTest {
         options.set(CoreOptions.LOG_CHANGELOG_MODE, 
CoreOptions.LogChangelogMode.ALL);
         test(options, ChangelogMode.all(), ChangelogMode.all());
     }
-
-    @Test
-    public void testInputChangelogProducerWithLog() throws Exception {
-        Options options = new Options();
-        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
-        test(options, ChangelogMode.upsert(), ChangelogMode.all(), new 
KafkaLogStoreFactory());
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 9374bc8ba..00a47cd80 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -21,37 +21,48 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
+import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for {@link FlinkCatalog}. */
-public class FileSystemCatalogITCase extends KafkaTableTestBase {
+public class FileSystemCatalogITCase extends AbstractTestBase {
 
-    private String path;
     private static final String DB_NAME = "default";
 
+    private String path;
+    private StreamTableEnvironment tEnv;
+
     @BeforeEach
-    public void before() throws IOException {
+    public void setup() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(1);
+
+        tEnv = StreamTableEnvironment.create(env);
+        tEnv.getConfig()
+                .getConfiguration()
+                .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
         path = getTempDirPath();
         tEnv.executeSql(
                 String.format("CREATE CATALOG fs WITH ('type'='paimon', 
'warehouse'='%s')", path));
-        env.setParallelism(1);
     }
 
     @Test
@@ -91,58 +102,6 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         assertThat(result).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
     }
 
-    @Test
-    public void testLogWriteRead() throws Exception {
-        String topic = UUID.randomUUID().toString();
-
-        try {
-            tEnv.useCatalog("fs");
-            tEnv.executeSql(
-                    String.format(
-                            "CREATE TABLE T (a STRING, b STRING, c STRING) 
WITH ("
-                                    + "'log.system'='kafka', "
-                                    + "'kafka.bootstrap.servers'='%s',"
-                                    + "'kafka.topic'='%s'"
-                                    + ")",
-                            getBootstrapServers(), topic));
-            innerTestWriteRead();
-        } finally {
-            deleteTopicIfExists(topic);
-        }
-    }
-
-    @Test
-    public void testLogWriteReadWithVirtual() throws Exception {
-        String topic = UUID.randomUUID().toString();
-        createTopicIfNotExists(topic, 1);
-
-        try {
-            tEnv.useCatalog("fs");
-            tEnv.executeSql(
-                    String.format(
-                            "CREATE TABLE T ("
-                                    + "a STRING, "
-                                    + "b STRING, "
-                                    + "c STRING, "
-                                    + "d AS CAST(c as INT) + 1"
-                                    + ") WITH ("
-                                    + "'log.system'='kafka', "
-                                    + "'kafka.bootstrap.servers'='%s',"
-                                    + "'kafka.topic'='%s'"
-                                    + ")",
-                            getBootstrapServers(), topic));
-
-            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
-            BlockingIterator<Row, Row> iterator =
-                    BlockingIterator.of(tEnv.from("T").execute().collect());
-            List<Row> result = iterator.collectAndClose(2);
-            assertThat(result)
-                    .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4), 
Row.of("4", "5", "6", 7));
-        } finally {
-            deleteTopicIfExists(topic);
-        }
-    }
-
     @Test
     public void testCatalogOptionsInheritAndOverride() throws Exception {
         tEnv.executeSql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index b87ca1277..44783009d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -19,9 +19,7 @@
 package org.apache.paimon.flink.util;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.WriteMode;
 import org.apache.paimon.flink.ReadWriteTableITCase;
-import org.apache.paimon.flink.StreamingReadWriteTableWithKafkaLogITCase;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -48,17 +46,9 @@ import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
-import static org.apache.paimon.CoreOptions.WRITE_MODE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
-import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
-import static 
org.apache.paimon.flink.kafka.KafkaTableTestBase.createTopicIfNotExists;
-import static 
org.apache.paimon.flink.kafka.KafkaTableTestBase.getBootstrapServers;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/**
- * Test util for {@link ReadWriteTableITCase} and {@link 
StreamingReadWriteTableWithKafkaLogITCase}.
- */
+/** Test util for {@link ReadWriteTableITCase}. */
 public class ReadWriteTableTestUtil {
 
     private static final Time TIME_OUT = Time.seconds(10);
@@ -135,34 +125,6 @@ public class ReadWriteTableTestUtil {
         return table;
     }
 
-    public static String createTableWithKafkaLog(
-            List<String> fieldsSpec,
-            List<String> primaryKeys,
-            List<String> partitionKeys,
-            boolean manuallyCreateLogTable) {
-        String topic = "topic_" + UUID.randomUUID();
-        String table =
-                createTable(
-                        fieldsSpec,
-                        primaryKeys,
-                        partitionKeys,
-                        new HashMap<String, String>() {
-                            {
-                                put(LOG_SYSTEM.key(), "kafka");
-                                put(BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
-                                put(TOPIC.key(), topic);
-                                
put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
-                                put(WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
-                            }
-                        });
-
-        if (manuallyCreateLogTable) {
-            createTopicIfNotExists(topic, 1);
-        }
-
-        return table;
-    }
-
     public static String createTemporaryTable(
             List<String> fieldsSpec,
             List<String> primaryKeys,

Reply via email to