This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a5795d204 [flink][refactor] Move Kafka related code into
paimon-flink-cdc module (#1988)
a5795d204 is described below
commit a5795d204e823864bf9ef7d3a48da87ec85005c7
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 12 16:51:10 2023 +0800
[flink][refactor] Move Kafka related code into paimon-flink-cdc module
(#1988)
---
paimon-docs/pom.xml | 7 ++
.../configuration/ConfigOptionsDocGenerator.java | 2 +-
paimon-flink/paimon-flink-cdc/pom.xml | 95 +++++++++++----
.../flink/kafka/KafkaLogDeserializationSchema.java | 0
.../apache/paimon/flink/kafka/KafkaLogOptions.java | 0
.../flink/kafka/KafkaLogSerializationSchema.java | 0
.../paimon/flink/kafka/KafkaLogSinkProvider.java | 0
.../paimon/flink/kafka/KafkaLogSourceProvider.java | 0
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 0
.../paimon/flink/kafka/KafkaLogStoreRegister.java | 0
.../paimon/flink/kafka/KafkaSinkFunction.java | 0
.../services/org.apache.paimon.factories.Factory | 2 +
...ndMultiPartitionedTableWithKafkaLogITCase.java} | 7 +-
.../ComputedColumnAndWatermarkTableITCase.java | 8 +-
.../flink/kafka/KafkaLogSerializationTest.java | 0
.../flink/kafka/KafkaLogStoreFactoryTest.java | 128 +++++++++++++++++++++
.../flink/kafka/KafkaLogStoreRegisterITCase.java | 6 +-
.../paimon/flink/kafka/KafkaLogTestUtils.java | 35 ++++++
.../paimon/flink/kafka/KafkaTableTestBase.java | 0
.../paimon/flink/kafka}/LogSystemITCase.java | 60 +++++++++-
.../StreamingReadWriteTableWithKafkaLogITCase.java | 5 +-
.../flink/kafka}/StreamingWarehouseITCase.java | 3 +-
.../flink/source/LogHybridSourceFactoryTest.java | 0
paimon-flink/paimon-flink-common/pom.xml | 22 ----
.../paimon/flink/AbstractFlinkTableFactory.java | 2 +-
.../services/org.apache.paimon.factories.Factory | 2 -
.../flink/AbstractFlinkTableFactoryTest.java | 53 ---------
.../org/apache/paimon/flink/ChangelogModeTest.java | 23 +---
.../paimon/flink/FileSystemCatalogITCase.java | 77 +++----------
.../paimon/flink/util/ReadWriteTableTestUtil.java | 40 +------
30 files changed, 338 insertions(+), 239 deletions(-)
diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index 962ddfcf7..8cdb2f078 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -50,6 +50,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-cdc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-spark-common</artifactId>
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 83c883f9c..a8b9cc0bf 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -77,7 +77,7 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation(
"paimon-flink/paimon-flink-common",
"org.apache.paimon.flink"),
new OptionsClassLocation(
- "paimon-flink/paimon-flink-common",
"org.apache.paimon.flink.kafka"),
+ "paimon-flink/paimon-flink-cdc",
"org.apache.paimon.flink.kafka"),
new OptionsClassLocation(
"paimon-hive/paimon-hive-catalog",
"org.apache.paimon.hive"),
new OptionsClassLocation(
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml
b/paimon-flink/paimon-flink-cdc/pom.xml
index 0c6b5b34a..d8e1591d2 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -79,6 +79,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- CDC dependencies -->
<dependency>
@@ -173,6 +180,49 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@@ -218,31 +268,34 @@ under the License.
<version>${mongodb.testcontainers.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-test-utils</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-paimon</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <!-- Same as flink-sql-connector-kafka. -->
+ <relocation>
+ <pattern>org.apache.kafka</pattern>
+
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogOptions.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSerializationSchema.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaSinkFunction.java
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3e165cf5f..1e3a2ff65 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-cdc/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+
### cdc action factories
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableActionFactory
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
index 0c09bd2a4..770757258 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CompositePkAndMultiPartitionedTableWIthKafkaLogITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/CompositePkAndMultiPartitionedTableWithKafkaLogITCase.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
@@ -31,6 +30,7 @@ import java.util.Collections;
import java.util.List;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
@@ -38,7 +38,6 @@ import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkFileStorePath;
-import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
@@ -53,7 +52,7 @@ import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
* IT cases of streaming reading and writing tables which have composite
primary keys and multiple
* partition fields with Kafka log.
*/
-public class CompositePkAndMultiPartitionedTableWIthKafkaLogITCase extends
KafkaTableTestBase {
+public class CompositePkAndMultiPartitionedTableWithKafkaLogITCase extends
KafkaTableTestBase {
@BeforeEach
public void setUp() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
index 9cb99297f..32c6d626d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ComputedColumnAndWatermarkTableITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/ComputedColumnAndWatermarkTableITCase.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
+package org.apache.paimon.flink.kafka;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
@@ -31,12 +29,12 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTable;
-import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertIntoFromTable;
@@ -48,7 +46,7 @@ import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingR
public class ComputedColumnAndWatermarkTableITCase extends KafkaTableTestBase {
@BeforeEach
- public void setUp() throws Exception {
+ public void setUp() {
init(createAndRegisterTempFile("").toString());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogSerializationTest.java
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
new file mode 100644
index 000000000..0b127491a
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.kafka;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.AbstractFlinkTableFactory;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.flink.sink.FlinkTableSink;
+import org.apache.paimon.flink.source.DataTableSource;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT about {@link KafkaLogStoreFactory}. */
+public class KafkaLogStoreFactoryTest {
+
+ @ParameterizedTest
+ @EnumSource(CoreOptions.StartupMode.class)
+ public void testCreateKafkaLogStoreFactory(CoreOptions.StartupMode
startupMode) {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
+ dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+ if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT
+ || startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
+ dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
+ } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
+ dynamicOptions.put(
+ SCAN_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
+ }
+ dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
+ DynamicTableFactory.Context context =
+ KafkaLogTestUtils.testContext(
+ "table",
+ "",
+ CoreOptions.LogChangelogMode.AUTO,
+ CoreOptions.LogConsistency.TRANSACTIONAL,
+ RowType.of(new IntType(), new IntType()),
+ new int[] {0},
+ dynamicOptions);
+
+ try {
+ Optional<LogStoreTableFactory> optional =
+
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
+ assertThat(startupMode)
+ .isNotIn(
+ CoreOptions.StartupMode.FROM_SNAPSHOT,
+ CoreOptions.StartupMode.FROM_SNAPSHOT_FULL);
+ assertThat(optional.isPresent()).isTrue();
+
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
+ } catch (ValidationException e) {
+ assertThat(startupMode)
+ .isIn(
+ CoreOptions.StartupMode.FROM_SNAPSHOT,
+ CoreOptions.StartupMode.FROM_SNAPSHOT_FULL);
+ }
+ }
+
+ @Test
+ public void testInputChangelogProducerWithKafkaLog(@TempDir
java.nio.file.Path temp)
+ throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+
+ Path path = new Path(temp.toUri().toString());
+ new SchemaManager(LocalFileIO.create(), path)
+ .createTable(
+ new Schema(
+ org.apache.paimon.types.RowType.of(
+ new
org.apache.paimon.types.IntType(),
+ new
org.apache.paimon.types.IntType())
+ .getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), path);
+
+ ObjectIdentifier identifier = ObjectIdentifier.of("c", "d", "t");
+ DataTableSource source =
+ new DataTableSource(identifier, table, true, null, new
KafkaLogStoreFactory());
+
assertThat(source.getChangelogMode()).isEqualTo(ChangelogMode.upsert());
+
+ FlinkTableSink sink = new FlinkTableSink(identifier, table, null,
null);
+
assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(ChangelogMode.all());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 08e182464..30e7d1de1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -24,7 +24,7 @@ import org.apache.paimon.options.Options;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
-import org.junit.After;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -48,7 +48,7 @@ public class KafkaLogStoreRegisterITCase extends
KafkaTableTestBase {
private static final String TABLE = "mock_table";
- @After
+ @AfterEach
public void tearDown() {
// clean up all the topics
try (AdminClient admin = createAdminClient()) {
@@ -157,7 +157,7 @@ public class KafkaLogStoreRegisterITCase extends
KafkaTableTestBase {
tableOptions.set(BOOTSTRAP_SERVERS, bootstrapServers);
if (topic != null) {
- tableOptions.set(KafkaLogOptions.TOPIC, topic);
+ tableOptions.set(TOPIC, topic);
}
if (partition != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
similarity index 85%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
index 583c69e77..b7f9ae91b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaLogTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.kafka;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.table.sink.SinkRecord;
@@ -56,11 +57,17 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static
org.apache.paimon.flink.kafka.KafkaTableTestBase.createTopicIfNotExists;
+import static
org.apache.paimon.flink.kafka.KafkaTableTestBase.getBootstrapServers;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTable;
import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
/** Utils for the test of {@link KafkaLogStoreFactory}. */
@@ -206,4 +213,32 @@ public class KafkaLogTestUtils {
hasPk ? row(pk) : EMPTY_ROW,
GenericRow.ofKind(rowKind, pk, value));
}
+
+ static String createTableWithKafkaLog(
+ List<String> fieldsSpec,
+ List<String> primaryKeys,
+ List<String> partitionKeys,
+ boolean manuallyCreateLogTable) {
+ String topic = "topic_" + UUID.randomUUID();
+ String table =
+ createTable(
+ fieldsSpec,
+ primaryKeys,
+ partitionKeys,
+ new HashMap<String, String>() {
+ {
+ put(LOG_SYSTEM.key(), "kafka");
+ put(BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
+ put(TOPIC.key(), topic);
+ put(DYNAMIC_PARTITION_OVERWRITE.key(),
"false");
+ put(WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
+ }
+ });
+
+ if (manuallyCreateLogTable) {
+ createTopicIfNotExists(topic, 1);
+ }
+
+ return table;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/KafkaTableTestBase.java
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
similarity index 86%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
index 201f3db5f..2b4a89f37 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LogSystemITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/LogSystemITCase.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.table.api.TableResult;
@@ -33,6 +32,7 @@ import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.List;
+import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -321,4 +321,60 @@ public class LogSystemITCase extends KafkaTableTestBase {
checkTopicExists("T", 2, 1);
}
+
+ @Test
+ public void testLogWriteRead() throws Exception {
+ String topic = UUID.randomUUID().toString();
+
+ try {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T (a STRING, b STRING, c STRING)
WITH ("
+ + "'log.system'='kafka', "
+ + "'kafka.bootstrap.servers'='%s',"
+ + "'kafka.topic'='%s'"
+ + ")",
+ getBootstrapServers(), topic));
+
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.from("T").execute().collect());
+ List<Row> result = iterator.collectAndClose(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3"),
Row.of("4", "5", "6"));
+ } finally {
+ deleteTopicIfExists(topic);
+ }
+ }
+
+ @Test
+ public void testLogWriteReadWithVirtual() throws Exception {
+ String topic = UUID.randomUUID().toString();
+ createTopicIfNotExists(topic, 1);
+
+ try {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE T ("
+ + "a STRING, "
+ + "b STRING, "
+ + "c STRING, "
+ + "d AS CAST(c as INT) + 1"
+ + ") WITH ("
+ + "'log.system'='kafka', "
+ + "'kafka.bootstrap.servers'='%s',"
+ + "'kafka.topic'='%s'"
+ + ")",
+ getBootstrapServers(), topic));
+
+ tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.from("T").execute().collect());
+ List<Row> result = iterator.collectAndClose(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4),
Row.of("4", "5", "6", 7));
+ } finally {
+ deleteTopicIfExists(topic);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
index c028c13e6..fb9e7d08d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingReadWriteTableWithKafkaLogITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingReadWriteTableWithKafkaLogITCase.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
@@ -35,13 +34,13 @@ import java.util.Map;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static
org.apache.paimon.flink.kafka.KafkaLogTestUtils.createTableWithKafkaLog;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.SCAN_LATEST;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.assertNoMoreRecords;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQueryWithTableOptions;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.checkFileStorePath;
-import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTableWithKafkaLog;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.createTemporaryTable;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
index 8b414379c..9c21f491c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/StreamingWarehouseITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/kafka/StreamingWarehouseITCase.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.paimon.flink;
+package org.apache.paimon.flink.kafka;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.api.common.JobStatus;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
similarity index 100%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
rename to
paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/source/LogHybridSourceFactoryTest.java
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index f94738206..87b985005 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -73,13 +73,6 @@ under the License.
<version>${frocksdbjni.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@@ -219,14 +212,6 @@ under the License.
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>${flink.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
@@ -247,13 +232,6 @@ under the License.
<include>org.apache.paimon:paimon-bundle</include>
</includes>
</artifactSet>
- <relocations>
- <!-- Same as flink-sql-connector-kafka. -->
- <relocation>
- <pattern>org.apache.kafka</pattern>
-
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
- </relocation>
- </relocations>
</configuration>
</execution>
</executions>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index a9d72f003..31acd85b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -157,7 +157,7 @@ public abstract class AbstractFlinkTableFactory
// ~ Tools
------------------------------------------------------------------
- static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
+ public static Optional<LogStoreTableFactory> createOptionalLogStoreFactory(
DynamicTableFactory.Context context) {
return createOptionalLogStoreFactory(
context.getClassLoader(),
context.getCatalogTable().getOptions());
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1e014e785..847233822 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
-
### action factories
org.apache.paimon.flink.action.CompactActionFactory
org.apache.paimon.flink.action.CompactDatabaseActionFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
index b2e829dbd..41958a016 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java
@@ -18,31 +18,14 @@
package org.apache.paimon.flink;
-import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
-import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import static org.apache.paimon.CoreOptions.LogChangelogMode;
-import static org.apache.paimon.CoreOptions.LogConsistency;
-import static org.apache.paimon.CoreOptions.SCAN_MODE;
-import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
-import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
-import static org.apache.paimon.CoreOptions.StartupMode;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link AbstractFlinkTableFactory}. */
@@ -70,42 +53,6 @@ public class AbstractFlinkTableFactoryTest {
true);
}
- @ParameterizedTest
- @EnumSource(StartupMode.class)
- public void testCreateKafkaLogStoreFactory(StartupMode startupMode) {
- Map<String, String> dynamicOptions = new HashMap<>();
- dynamicOptions.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
- dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
- if (startupMode == StartupMode.FROM_SNAPSHOT
- || startupMode == StartupMode.FROM_SNAPSHOT_FULL) {
- dynamicOptions.put(SCAN_SNAPSHOT_ID.key(), "1");
- } else if (startupMode == StartupMode.FROM_TIMESTAMP) {
- dynamicOptions.put(
- SCAN_TIMESTAMP_MILLIS.key(),
String.valueOf(System.currentTimeMillis()));
- }
- dynamicOptions.put(SCAN_MODE.key(), startupMode.toString());
- DynamicTableFactory.Context context =
- KafkaLogTestUtils.testContext(
- "table",
- "",
- LogChangelogMode.AUTO,
- LogConsistency.TRANSACTIONAL,
- RowType.of(new IntType(), new IntType()),
- new int[] {0},
- dynamicOptions);
-
- try {
- Optional<LogStoreTableFactory> optional =
-
AbstractFlinkTableFactory.createOptionalLogStoreFactory(context);
- assertThat(startupMode)
- .isNotIn(StartupMode.FROM_SNAPSHOT,
StartupMode.FROM_SNAPSHOT_FULL);
- assertThat(optional.isPresent()).isTrue();
-
assertThat(optional.get()).isInstanceOf(KafkaLogStoreFactory.class);
- } catch (ValidationException e) {
- assertThat(startupMode).isIn(StartupMode.FROM_SNAPSHOT,
StartupMode.FROM_SNAPSHOT_FULL);
- }
- }
-
private void innerTest(RowType r1, RowType r2, boolean expectEquals) {
assertThat(AbstractFlinkTableFactory.schemaEquals(r1,
r2)).isEqualTo(expectEquals);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
index 71117b413..9022bbb50 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java
@@ -19,8 +19,6 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.kafka.KafkaLogStoreFactory;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.fs.Path;
@@ -39,8 +37,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
@@ -61,15 +57,6 @@ public class ChangelogModeTest {
private void test(Options options, ChangelogMode expectSource,
ChangelogMode expectSink)
throws Exception {
- test(options, expectSource, expectSink, null);
- }
-
- private void test(
- Options options,
- ChangelogMode expectSource,
- ChangelogMode expectSink,
- @Nullable LogStoreTableFactory logStoreTableFactory)
- throws Exception {
new SchemaManager(LocalFileIO.create(), path)
.createTable(
new Schema(
@@ -80,8 +67,7 @@ public class ChangelogModeTest {
""));
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), path);
- DataTableSource source =
- new DataTableSource(identifier, table, true, null,
logStoreTableFactory);
+ DataTableSource source = new DataTableSource(identifier, table, true,
null, null);
assertThat(source.getChangelogMode()).isEqualTo(expectSource);
FlinkTableSink sink = new FlinkTableSink(identifier, table, null,
null);
@@ -106,11 +92,4 @@ public class ChangelogModeTest {
options.set(CoreOptions.LOG_CHANGELOG_MODE,
CoreOptions.LogChangelogMode.ALL);
test(options, ChangelogMode.all(), ChangelogMode.all());
}
-
- @Test
- public void testInputChangelogProducerWithLog() throws Exception {
- Options options = new Options();
- options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
- test(options, ChangelogMode.upsert(), ChangelogMode.all(), new
KafkaLogStoreFactory());
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 9374bc8ba..00a47cd80 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -21,37 +21,48 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.kafka.KafkaTableTestBase;
+import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for {@link FlinkCatalog}. */
-public class FileSystemCatalogITCase extends KafkaTableTestBase {
+public class FileSystemCatalogITCase extends AbstractTestBase {
- private String path;
private static final String DB_NAME = "default";
+ private String path;
+ private StreamTableEnvironment tEnv;
+
@BeforeEach
- public void before() throws IOException {
+ public void setup() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(1);
+
+ tEnv = StreamTableEnvironment.create(env);
+ tEnv.getConfig()
+ .getConfiguration()
+ .set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, false);
path = getTempDirPath();
tEnv.executeSql(
String.format("CREATE CATALOG fs WITH ('type'='paimon',
'warehouse'='%s')", path));
- env.setParallelism(1);
}
@Test
@@ -91,58 +102,6 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
assertThat(result).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
}
- @Test
- public void testLogWriteRead() throws Exception {
- String topic = UUID.randomUUID().toString();
-
- try {
- tEnv.useCatalog("fs");
- tEnv.executeSql(
- String.format(
- "CREATE TABLE T (a STRING, b STRING, c STRING)
WITH ("
- + "'log.system'='kafka', "
- + "'kafka.bootstrap.servers'='%s',"
- + "'kafka.topic'='%s'"
- + ")",
- getBootstrapServers(), topic));
- innerTestWriteRead();
- } finally {
- deleteTopicIfExists(topic);
- }
- }
-
- @Test
- public void testLogWriteReadWithVirtual() throws Exception {
- String topic = UUID.randomUUID().toString();
- createTopicIfNotExists(topic, 1);
-
- try {
- tEnv.useCatalog("fs");
- tEnv.executeSql(
- String.format(
- "CREATE TABLE T ("
- + "a STRING, "
- + "b STRING, "
- + "c STRING, "
- + "d AS CAST(c as INT) + 1"
- + ") WITH ("
- + "'log.system'='kafka', "
- + "'kafka.bootstrap.servers'='%s',"
- + "'kafka.topic'='%s'"
- + ")",
- getBootstrapServers(), topic));
-
- tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5',
'6')").await();
- BlockingIterator<Row, Row> iterator =
- BlockingIterator.of(tEnv.from("T").execute().collect());
- List<Row> result = iterator.collectAndClose(2);
- assertThat(result)
- .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4),
Row.of("4", "5", "6", 7));
- } finally {
- deleteTopicIfExists(topic);
- }
- }
-
@Test
public void testCatalogOptionsInheritAndOverride() throws Exception {
tEnv.executeSql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index b87ca1277..44783009d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -19,9 +19,7 @@
package org.apache.paimon.flink.util;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.ReadWriteTableITCase;
-import org.apache.paimon.flink.StreamingReadWriteTableWithKafkaLogITCase;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -48,17 +46,9 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
-import static org.apache.paimon.CoreOptions.WRITE_MODE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
-import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
-import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
-import static
org.apache.paimon.flink.kafka.KafkaTableTestBase.createTopicIfNotExists;
-import static
org.apache.paimon.flink.kafka.KafkaTableTestBase.getBootstrapServers;
import static org.assertj.core.api.Assertions.assertThat;
-/**
- * Test util for {@link ReadWriteTableITCase} and {@link
StreamingReadWriteTableWithKafkaLogITCase}.
- */
+/** Test util for {@link ReadWriteTableITCase}. */
public class ReadWriteTableTestUtil {
private static final Time TIME_OUT = Time.seconds(10);
@@ -135,34 +125,6 @@ public class ReadWriteTableTestUtil {
return table;
}
- public static String createTableWithKafkaLog(
- List<String> fieldsSpec,
- List<String> primaryKeys,
- List<String> partitionKeys,
- boolean manuallyCreateLogTable) {
- String topic = "topic_" + UUID.randomUUID();
- String table =
- createTable(
- fieldsSpec,
- primaryKeys,
- partitionKeys,
- new HashMap<String, String>() {
- {
- put(LOG_SYSTEM.key(), "kafka");
- put(BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
- put(TOPIC.key(), topic);
-
put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
- put(WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
- }
- });
-
- if (manuallyCreateLogTable) {
- createTopicIfNotExists(topic, 1);
- }
-
- return table;
- }
-
public static String createTemporaryTable(
List<String> fieldsSpec,
List<String> primaryKeys,