This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 47bc30fd1 [flink] Bump flink version to 1.19 (#3049)
47bc30fd1 is described below
commit 47bc30fd13f4e05798f35ff87a0dc83cb3ae67d6
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 20 17:36:07 2024 +0800
[flink] Bump flink version to 1.19 (#3049)
---
.github/workflows/e2e-tests-1.18-jdk11.yml | 4 +-
.github/workflows/e2e-tests-1.18.yml | 2 +-
...sts-1.18-jdk11.yml => e2e-tests-1.19-jdk11.yml} | 6 +--
.../{e2e-tests-1.18.yml => e2e-tests-1.19.yml} | 6 +--
.github/workflows/unitcase-flink-jdk11.yml | 7 ++-
.github/workflows/utitcase-flink.yml | 2 +-
paimon-e2e-tests/pom.xml | 12 +++--
.../paimon/flink/utils/ManagedMemoryUtils.java | 0
.../paimon/flink/ContinuousFileStoreITCase.java | 20 ++++++++
paimon-flink/paimon-flink-1.16/pom.xml | 9 +++-
.../paimon/flink/utils/ManagedMemoryUtils.java | 0
.../paimon/flink/ContinuousFileStoreITCase.java | 59 ++++++++++++++++++++++
paimon-flink/paimon-flink-1.17/pom.xml | 38 ++++++++++++++
.../paimon/flink/utils/ManagedMemoryUtils.java | 0
.../paimon/flink/ContinuousFileStoreITCase.java | 59 ++++++++++++++++++++++
paimon-flink/paimon-flink-1.18/pom.xml | 44 ++++++++++++++++
.../paimon/flink/utils/ManagedMemoryUtils.java | 0
.../paimon/flink/ContinuousFileStoreITCase.java | 59 ++++++++++++++++++++++
.../pom.xml | 6 +--
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 3 +-
paimon-flink/paimon-flink-common/pom.xml | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 14 +----
.../flink/sink/RowDataStoreWriteOperator.java | 8 ++-
.../paimon/flink/utils/ManagedMemoryUtils.java | 1 +
.../apache/paimon/flink/RescaleBucketITCase.java | 14 +++--
paimon-flink/pom.xml | 6 +++
paimon-hive/paimon-hive-catalog/pom.xml | 25 ---------
paimon-hive/paimon-hive-connector-2.3/pom.xml | 29 ++---------
paimon-hive/paimon-hive-connector-3.1/pom.xml | 29 ++---------
paimon-hive/paimon-hive-connector-common/pom.xml | 43 +++++-----------
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 4 ++
paimon-hive/pom.xml | 42 ++++++++++++++-
pom.xml | 5 +-
33 files changed, 412 insertions(+), 146 deletions(-)
diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml
b/.github/workflows/e2e-tests-1.18-jdk11.yml
index a8b42a6df..b924a3b07 100644
--- a/.github/workflows/e2e-tests-1.18-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.18-jdk11.yml
@@ -46,13 +46,13 @@ jobs:
distribution: 'adopt'
- name: Build Flink 1.18
run: mvn -T 1C -B clean install -DskipTests
- - name: Test Flink 1.17
+ - name: Test Flink 1.18
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
-Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.18.yml
b/.github/workflows/e2e-tests-1.18.yml
index 2985b45c4..2f566004a 100644
--- a/.github/workflows/e2e-tests-1.18.yml
+++ b/.github/workflows/e2e-tests-1.18.yml
@@ -52,6 +52,6 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
-Pflink-1.18
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml
b/.github/workflows/e2e-tests-1.19-jdk11.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.18-jdk11.yml
copy to .github/workflows/e2e-tests-1.19-jdk11.yml
index a8b42a6df..bc917f453 100644
--- a/.github/workflows/e2e-tests-1.18-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.19-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink 1.18 on JDK 11
+name: End to End Tests Flink 1.19 on JDK 11
on:
issue_comment:
@@ -44,9 +44,9 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- - name: Build Flink 1.18
+ - name: Build Flink 1.19
run: mvn -T 1C -B clean install -DskipTests
- - name: Test Flink 1.17
+ - name: Test Flink 1.19
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.18.yml
b/.github/workflows/e2e-tests-1.19.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.18.yml
copy to .github/workflows/e2e-tests-1.19.yml
index 2985b45c4..b451d6385 100644
--- a/.github/workflows/e2e-tests-1.18.yml
+++ b/.github/workflows/e2e-tests-1.19.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink 1.18
+name: End to End Tests Flink 1.19
on:
push:
@@ -43,9 +43,9 @@ jobs:
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'adopt'
- - name: Build Flink 1.18
+ - name: Build Flink 1.19
run: mvn -T 1C -B clean install -DskipTests
- - name: Test Flink 1.18
+ - name: Test Flink 1.19
timeout-minutes: 60
run: |
# run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/unitcase-flink-jdk11.yml
b/.github/workflows/unitcase-flink-jdk11.yml
index 59dd1457c..135dc5718 100644
--- a/.github/workflows/unitcase-flink-jdk11.yml
+++ b/.github/workflows/unitcase-flink-jdk11.yml
@@ -52,6 +52,11 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -pl
'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+ test_modules=""
+ for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
+ test_modules+="org.apache.paimon:paimon-flink-${suffix},"
+ done
+ test_modules="${test_modules%,}"
+ mvn -T 1C -B clean install -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink.yml
b/.github/workflows/utitcase-flink.yml
index b6cc9cbd1..c7455e814 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -52,7 +52,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in 1.15 1.16 1.17 1.18 common; do
+ for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 604eb04c7..fdcdbda7a 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -34,7 +34,6 @@ under the License.
<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
<flink.cdc.version>2.3.0</flink.cdc.version>
-
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>
@@ -210,7 +209,7 @@ under the License.
<!-- test paimon with kafka sql jar -->
<artifactItem>
<groupId>org.apache.flink</groupId>
-
<artifactId>${flink.sql.connector.kafka}</artifactId>
+ <artifactId>flink-sql-connector-kafka</artifactId>
<version>${test.flink.connector.kafka.version}</version>
<destFileName>flink-sql-connector-kafka.jar</destFileName>
<type>jar</type>
@@ -276,6 +275,14 @@ under the License.
<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test
against different Flink versions -->
+ <profile>
+ <id>flink-1.18</id>
+ <properties>
+ <test.flink.main.version>1.18</test.flink.main.version>
+ <test.flink.version>1.18.1</test.flink.version>
+ </properties>
+ </profile>
+
<profile>
<id>flink-1.17</id>
<properties>
@@ -300,7 +307,6 @@ under the License.
<test.flink.main.version>1.15</test.flink.main.version>
<test.flink.version>1.15.3</test.flink.version>
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
-
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
</properties>
</profile>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to
paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index e1c62e28f..5e9b7c7ce 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -329,4 +329,24 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
"SELECT * FROM T1 /*+
OPTIONS('log.consistency'='eventual') */"),
"File store continuous reading does not support eventual
consistency mode");
}
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'),
('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater
than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
}
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml
b/paimon-flink/paimon-flink-1.16/pom.xml
index 9aaa19228..3558309e6 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -64,7 +64,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -93,6 +93,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to
paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)
WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' =
'1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'),
('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater
than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.17/pom.xml
b/paimon-flink/paimon-flink-1.17/pom.xml
index 454386d59..11318dbab 100644
--- a/paimon-flink/paimon-flink-1.17/pom.xml
+++ b/paimon-flink/paimon-flink-1.17/pom.xml
@@ -68,6 +68,44 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to
paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)
WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' =
'1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'),
('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater
than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml
b/paimon-flink/paimon-flink-1.18/pom.xml
index a0f83fa0d..31e85df3b 100644
--- a/paimon-flink/paimon-flink-1.18/pom.xml
+++ b/paimon-flink/paimon-flink-1.18/pom.xml
@@ -42,6 +42,12 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -55,6 +61,44 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)
WITH ('bucket' = '1')",
+ "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ + " WITH ('changelog-producer'='input', 'bucket' =
'1')");
+ }
+
+ @Test
+ public void testFlinkMemoryPool() {
+ // Check if the configuration is effective
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='0M') */ "
+ + "VALUES ('1', '2', '3'),
('4', '5', '6')",
+ "T1"))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Weights for operator scope use cases must be greater
than 0.");
+
+ batchSql(
+ "INSERT INTO %s /*+
OPTIONS('sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='1M') */ "
+ + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+ "T1");
+ assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml
b/paimon-flink/paimon-flink-1.19/pom.xml
similarity index 95%
copy from paimon-flink/paimon-flink-1.18/pom.xml
copy to paimon-flink/paimon-flink-1.19/pom.xml
index a0f83fa0d..60a33df24 100644
--- a/paimon-flink/paimon-flink-1.18/pom.xml
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -30,11 +30,11 @@ under the License.
<packaging>jar</packaging>
- <artifactId>paimon-flink-1.18</artifactId>
- <name>Paimon : Flink : 1.18</name>
+ <artifactId>paimon-flink-1.19</artifactId>
+ <name>Paimon : Flink : 1.19</name>
<properties>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.19.0</flink.version>
</properties>
<dependencies>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c0156e841..9cba78f21 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -131,8 +131,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(input.getParallelism());
- configureGlobalCommitter(
- committed, commitCpuCores, commitHeapMemory,
env.getConfiguration());
+ configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index efca0b1bd..eea8b9b58 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>
<properties>
- <flink.version>1.18.1</flink.version>
+ <flink.version>1.19.0</flink.version>
</properties>
<dependencies>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 582fcfc35..97c426ee5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -52,7 +52,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
-import static
org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -261,27 +260,18 @@ public abstract class FlinkSink<T> implements
Serializable {
.setMaxParallelism(1);
Options options = Options.fromMap(table.options());
configureGlobalCommitter(
- committed,
- options.get(SINK_COMMITTER_CPU),
- options.get(SINK_COMMITTER_MEMORY),
- conf);
+ committed, options.get(SINK_COMMITTER_CPU),
options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
public static void configureGlobalCommitter(
SingleOutputStreamOperator<?> committed,
double cpuCores,
- @Nullable MemorySize heapMemory,
- ReadableConfig conf) {
+ @Nullable MemorySize heapMemory) {
if (heapMemory == null) {
return;
}
- if (!conf.get(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) {
- throw new RuntimeException(
- "To support the 'sink.committer-cpu' and
'sink.committer-memory' configurations, you must enable fine-grained resource
management. Please set 'cluster.fine-grained-resource-management.enabled' to
'true' in your Flink configuration.");
- }
-
SlotSharingGroup slotSharingGroup =
SlotSharingGroup.newBuilder(committed.getName())
.setCpuCores(cpuCores)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index a5c8e5557..b61fecab5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
@@ -96,7 +97,12 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
this.sinkContext = new SimpleContext(getProcessingTimeService());
if (logSinkFunction != null) {
- FunctionUtils.openFunction(logSinkFunction, new Configuration());
+ // to stay compatible with Flink 1.18-
+ if (logSinkFunction instanceof RichFunction) {
+ RichFunction richFunction = (RichFunction) logSinkFunction;
+ richFunction.open(new Configuration());
+ }
+
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
index a51d9e02e..82964e41c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -43,6 +43,7 @@ public class ManagedMemoryUtils {
operator.getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
environment.getTaskManagerInfo().getConfiguration(),
environment.getUserCodeClassLoader().asClassLoader()));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 841a24ba6..a559d3350 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.paimon.CoreOptions.BUCKET;
@@ -80,13 +81,11 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
+ "INSERT INTO `T4` SELECT * FROM `S0`;\n"
+ "END";
-
sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH,
path);
-
// step1: run streaming insert
JobClient jobClient = startJobAndCommitSnapshot(streamSql, null);
// step2: stop with savepoint
- stopJobSafely(jobClient);
+ String savepointPath = stopJobSafely(jobClient);
final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3");
assertThat(snapshotBeforeRescale).isNotNull();
@@ -107,6 +106,9 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM
T3")).containsExactlyInAnyOrderElementsOf(committedData);
// step5: resume streaming job
+ sEnv.getConfig()
+ .getConfiguration()
+ .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
JobClient resumedJobClient =
startJobAndCommitSnapshot(streamSql,
snapshotAfterRescale.id());
// stop job
@@ -144,11 +146,13 @@ public class RescaleBucketITCase extends
CatalogITCaseBase {
return jobClient;
}
- private void stopJobSafely(JobClient client) throws ExecutionException,
InterruptedException {
- client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT);
+ private String stopJobSafely(JobClient client) throws ExecutionException,
InterruptedException {
+ CompletableFuture<String> savepointPath =
+ client.stopWithSavepoint(true, path,
SavepointFormatType.DEFAULT);
while (!client.getJobStatus().get().isGloballyTerminalState()) {
Thread.sleep(2000L);
}
+ return savepointPath.get();
}
private void assertLatestSchema(
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index 30ef22d8c..caeb2fcea 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
<module>paimon-flink-1.16</module>
<module>paimon-flink-1.17</module>
<module>paimon-flink-1.18</module>
+ <module>paimon-flink-1.19</module>
<module>paimon-flink-action</module>
<module>paimon-flink-cdc</module>
</modules>
@@ -95,6 +96,10 @@ under the License.
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -162,6 +167,7 @@ under the License.
<module>paimon-flink-1.16</module>
<module>paimon-flink-1.17</module>
<module>paimon-flink-1.18</module>
+ <module>paimon-flink-1.19</module>
<module>paimon-flink-cdc</module>
</modules>
<build>
diff --git a/paimon-hive/paimon-hive-catalog/pom.xml
b/paimon-hive/paimon-hive-catalog/pom.xml
index b50e58563..f4c9a810b 100644
--- a/paimon-hive/paimon-hive-catalog/pom.xml
+++ b/paimon-hive/paimon-hive-catalog/pom.xml
@@ -87,31 +87,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml
b/paimon-hive/paimon-hive-connector-2.3/pom.xml
index 0f4733f00..8f2005e56 100644
--- a/paimon-hive/paimon-hive-connector-2.3/pom.xml
+++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml
@@ -127,31 +127,6 @@ under the License.
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@@ -554,6 +529,10 @@ under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml
b/paimon-hive/paimon-hive-connector-3.1/pom.xml
index 1792f032f..d52bc0b0b 100644
--- a/paimon-hive/paimon-hive-connector-3.1/pom.xml
+++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml
@@ -141,31 +141,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@@ -580,6 +555,10 @@ under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index a9939854c..7d5f1e551 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -46,35 +46,6 @@ under the License.
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@@ -519,6 +490,12 @@ under the License.
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!--
@@ -574,6 +551,14 @@ under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index aacd9087c..668c88f1f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -176,6 +176,10 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2,
'Hello')").await();
Path tablePath = new Path(path, "test_db2.db/t");
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
+ assertThatThrownBy(() -> tEnv.executeSql("DROP DATABASE
test_db2").await())
+ .hasRootCauseInstanceOf(ValidationException.class)
+ .hasRootCauseMessage("Cannot drop a database which is
currently in use.");
+ tEnv.executeSql("USE test_db");
assertThatThrownBy(() -> tEnv.executeSql("DROP DATABASE
test_db2").await())
.hasRootCauseInstanceOf(DatabaseNotEmptyException.class)
.hasRootCauseMessage("Database test_db2 in catalog my_hive is
not empty.");
diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml
index 8e1273daa..531591b69 100644
--- a/paimon-hive/pom.xml
+++ b/paimon-hive/pom.xml
@@ -52,6 +52,47 @@ under the License.
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-s3</artifactId>
@@ -81,7 +122,6 @@ under the License.
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-test-utils</artifactId>
diff --git a/pom.xml b/pom.xml
index 238398e20..6fdda9566 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,8 +101,9 @@ under the License.
<!-- Can be set to any value to reproduce a specific build. -->
<test.randomization.seed/>
<test.unit.pattern>**/*Test.*</test.unit.pattern>
- <test.flink.main.version>1.18</test.flink.main.version>
- <test.flink.version>1.18.1</test.flink.version>
+ <test.flink.main.version>1.19</test.flink.main.version>
+ <test.flink.version>1.19.0</test.flink.version>
+ <!-- TODO upgrade after connector releases x-1.19 version -->
<test.flink.connector.kafka.version>3.0.1-1.18</test.flink.connector.kafka.version>
<janino.version>3.0.11</janino.version>