This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 47bc30fd1 [flink] Bump flink version to 1.19 (#3049)
47bc30fd1 is described below

commit 47bc30fd13f4e05798f35ff87a0dc83cb3ae67d6
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 20 17:36:07 2024 +0800

    [flink] Bump flink version to 1.19 (#3049)
---
 .github/workflows/e2e-tests-1.18-jdk11.yml         |  4 +-
 .github/workflows/e2e-tests-1.18.yml               |  2 +-
 ...sts-1.18-jdk11.yml => e2e-tests-1.19-jdk11.yml} |  6 +--
 .../{e2e-tests-1.18.yml => e2e-tests-1.19.yml}     |  6 +--
 .github/workflows/unitcase-flink-jdk11.yml         |  7 ++-
 .github/workflows/utitcase-flink.yml               |  2 +-
 paimon-e2e-tests/pom.xml                           | 12 +++--
 .../paimon/flink/utils/ManagedMemoryUtils.java     |  0
 .../paimon/flink/ContinuousFileStoreITCase.java    | 20 ++++++++
 paimon-flink/paimon-flink-1.16/pom.xml             |  9 +++-
 .../paimon/flink/utils/ManagedMemoryUtils.java     |  0
 .../paimon/flink/ContinuousFileStoreITCase.java    | 59 ++++++++++++++++++++++
 paimon-flink/paimon-flink-1.17/pom.xml             | 38 ++++++++++++++
 .../paimon/flink/utils/ManagedMemoryUtils.java     |  0
 .../paimon/flink/ContinuousFileStoreITCase.java    | 59 ++++++++++++++++++++++
 paimon-flink/paimon-flink-1.18/pom.xml             | 44 ++++++++++++++++
 .../paimon/flink/utils/ManagedMemoryUtils.java     |  0
 .../paimon/flink/ContinuousFileStoreITCase.java    | 59 ++++++++++++++++++++++
 .../pom.xml                                        |  6 +--
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  3 +-
 paimon-flink/paimon-flink-common/pom.xml           |  2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    | 14 +----
 .../flink/sink/RowDataStoreWriteOperator.java      |  8 ++-
 .../paimon/flink/utils/ManagedMemoryUtils.java     |  1 +
 .../apache/paimon/flink/RescaleBucketITCase.java   | 14 +++--
 paimon-flink/pom.xml                               |  6 +++
 paimon-hive/paimon-hive-catalog/pom.xml            | 25 ---------
 paimon-hive/paimon-hive-connector-2.3/pom.xml      | 29 ++---------
 paimon-hive/paimon-hive-connector-3.1/pom.xml      | 29 ++---------
 paimon-hive/paimon-hive-connector-common/pom.xml   | 43 +++++-----------
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  4 ++
 paimon-hive/pom.xml                                | 42 ++++++++++++++-
 pom.xml                                            |  5 +-
 33 files changed, 412 insertions(+), 146 deletions(-)

diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml 
b/.github/workflows/e2e-tests-1.18-jdk11.yml
index a8b42a6df..b924a3b07 100644
--- a/.github/workflows/e2e-tests-1.18-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.18-jdk11.yml
@@ -46,13 +46,13 @@ jobs:
           distribution: 'adopt'
       - name: Build Flink 1.18
         run: mvn -T 1C -B clean install -DskipTests
-      - name: Test Flink 1.17
+      - name: Test Flink 1.18
         timeout-minutes: 60
         run: |
           # run tests with random timezone to find out timezone related bugs
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone 
-Pflink-1.18
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.18.yml 
b/.github/workflows/e2e-tests-1.18.yml
index 2985b45c4..2f566004a 100644
--- a/.github/workflows/e2e-tests-1.18.yml
+++ b/.github/workflows/e2e-tests-1.18.yml
@@ -52,6 +52,6 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+          mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone 
-Pflink-1.18
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-1.18-jdk11.yml 
b/.github/workflows/e2e-tests-1.19-jdk11.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.18-jdk11.yml
copy to .github/workflows/e2e-tests-1.19-jdk11.yml
index a8b42a6df..bc917f453 100644
--- a/.github/workflows/e2e-tests-1.18-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.19-jdk11.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink 1.18 on JDK 11
+name: End to End Tests Flink 1.19 on JDK 11
 
 on:
   issue_comment:
@@ -44,9 +44,9 @@ jobs:
         with:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
-      - name: Build Flink 1.18
+      - name: Build Flink 1.19
         run: mvn -T 1C -B clean install -DskipTests
-      - name: Test Flink 1.17
+      - name: Test Flink 1.19
         timeout-minutes: 60
         run: |
           # run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/e2e-tests-1.18.yml 
b/.github/workflows/e2e-tests-1.19.yml
similarity index 93%
copy from .github/workflows/e2e-tests-1.18.yml
copy to .github/workflows/e2e-tests-1.19.yml
index 2985b45c4..b451d6385 100644
--- a/.github/workflows/e2e-tests-1.18.yml
+++ b/.github/workflows/e2e-tests-1.19.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink 1.18
+name: End to End Tests Flink 1.19
 
 on:
   push:
@@ -43,9 +43,9 @@ jobs:
         with:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'adopt'
-      - name: Build Flink 1.18
+      - name: Build Flink 1.19
         run: mvn -T 1C -B clean install -DskipTests
-      - name: Test Flink 1.18
+      - name: Test Flink 1.19
         timeout-minutes: 60
         run: |
           # run tests with random timezone to find out timezone related bugs
diff --git a/.github/workflows/unitcase-flink-jdk11.yml 
b/.github/workflows/unitcase-flink-jdk11.yml
index 59dd1457c..135dc5718 100644
--- a/.github/workflows/unitcase-flink-jdk11.yml
+++ b/.github/workflows/unitcase-flink-jdk11.yml
@@ -52,6 +52,11 @@ jobs:
           . .github/workflows/utils.sh
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
-          mvn -T 1C -B clean install -pl 
'org.apache.paimon:paimon-flink-common' -Duser.timezone=$jvm_timezone
+          test_modules=""
+          for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
+          test_modules+="org.apache.paimon:paimon-flink-${suffix},"
+          done
+          test_modules="${test_modules%,}"
+          mvn -T 1C -B clean install -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink.yml 
b/.github/workflows/utitcase-flink.yml
index b6cc9cbd1..c7455e814 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink.yml
@@ -52,7 +52,7 @@ jobs:
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
           test_modules=""
-          for suffix in 1.15 1.16 1.17 1.18 common; do
+          for suffix in 1.15 1.16 1.17 1.18 1.19 common; do
             test_modules+="org.apache.paimon:paimon-flink-${suffix}," 
           done
           test_modules="${test_modules%,}"
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 604eb04c7..fdcdbda7a 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -34,7 +34,6 @@ under the License.
     <properties>
         <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
         <flink.cdc.version>2.3.0</flink.cdc.version>
-        
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
         
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
     </properties>
 
@@ -210,7 +209,7 @@ under the License.
                         <!-- test paimon with kafka sql jar -->
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
-                            
<artifactId>${flink.sql.connector.kafka}</artifactId>
+                            <artifactId>flink-sql-connector-kafka</artifactId>
                             
<version>${test.flink.connector.kafka.version}</version>
                             
<destFileName>flink-sql-connector-kafka.jar</destFileName>
                             <type>jar</type>
@@ -276,6 +275,14 @@ under the License.
 
     <profiles>
         <!-- Activate these profiles with -Pflink-x.xx to build and test 
against different Flink versions -->
+        <profile>
+            <id>flink-1.18</id>
+            <properties>
+                <test.flink.main.version>1.18</test.flink.main.version>
+                <test.flink.version>1.18.1</test.flink.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>flink-1.17</id>
             <properties>
@@ -300,7 +307,6 @@ under the License.
                 <test.flink.main.version>1.15</test.flink.main.version>
                 <test.flink.version>1.15.3</test.flink.version>
                 
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
-                
<flink.sql.connector.kafka>flink-sql-connector-kafka</flink.sql.connector.kafka>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
             </properties>
         </profile>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to 
paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index e1c62e28f..5e9b7c7ce 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -329,4 +329,24 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
                                 "SELECT * FROM T1 /*+ 
OPTIONS('log.consistency'='eventual') */"),
                 "File store continuous reading does not support eventual 
consistency mode");
     }
+
+    @Test
+    public void testFlinkMemoryPool() {
+        // Check if the configuration is effective
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='0M') */ "
+                                                + "VALUES ('1', '2', '3'), 
('4', '5', '6')",
+                                        "T1"))
+                .hasCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Weights for operator scope use cases must be greater 
than 0.");
+
+        batchSql(
+                "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='1M') */ "
+                        + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+                "T1");
+        assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+    }
 }
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml 
b/paimon-flink/paimon-flink-1.16/pom.xml
index 9aaa19228..3558309e6 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -64,7 +64,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-common</artifactId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -93,6 +93,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to 
paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) 
WITH ('bucket' = '1')",
+                "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
+                        + " WITH ('changelog-producer'='input', 'bucket' = 
'1')");
+    }
+
+    @Test
+    public void testFlinkMemoryPool() {
+        // Check if the configuration is effective
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='0M') */ "
+                                                + "VALUES ('1', '2', '3'), 
('4', '5', '6')",
+                                        "T1"))
+                .hasCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Weights for operator scope use cases must be greater 
than 0.");
+
+        batchSql(
+                "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='1M') */ "
+                        + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+                "T1");
+        assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+    }
+}
diff --git a/paimon-flink/paimon-flink-1.17/pom.xml 
b/paimon-flink/paimon-flink-1.17/pom.xml
index 454386d59..11318dbab 100644
--- a/paimon-flink/paimon-flink-1.17/pom.xml
+++ b/paimon-flink/paimon-flink-1.17/pom.xml
@@ -68,6 +68,44 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to 
paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git 
a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) 
WITH ('bucket' = '1')",
+                "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
+                        + " WITH ('changelog-producer'='input', 'bucket' = 
'1')");
+    }
+
+    @Test
+    public void testFlinkMemoryPool() {
+        // Check if the configuration is effective
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='0M') */ "
+                                                + "VALUES ('1', '2', '3'), 
('4', '5', '6')",
+                                        "T1"))
+                .hasCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Weights for operator scope use cases must be greater 
than 0.");
+
+        batchSql(
+                "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='1M') */ "
+                        + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+                "T1");
+        assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+    }
+}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml 
b/paimon-flink/paimon-flink-1.18/pom.xml
index a0f83fa0d..31e85df3b 100644
--- a/paimon-flink/paimon-flink-1.18/pom.xml
+++ b/paimon-flink/paimon-flink-1.18/pom.xml
@@ -42,6 +42,12 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-flink-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -55,6 +61,44 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
similarity index 100%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
copy to 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
new file mode 100644
index 000000000..84c84d1c6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** SQL ITCase for continuous file store. */
+public class ContinuousFileStoreITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING) 
WITH ('bucket' = '1')",
+                "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
+                        + " WITH ('changelog-producer'='input', 'bucket' = 
'1')");
+    }
+
+    @Test
+    public void testFlinkMemoryPool() {
+        // Check if the configuration is effective
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='0M') */ "
+                                                + "VALUES ('1', '2', '3'), 
('4', '5', '6')",
+                                        "T1"))
+                .hasCauseInstanceOf(IllegalArgumentException.class)
+                .hasRootCauseMessage(
+                        "Weights for operator scope use cases must be greater 
than 0.");
+
+        batchSql(
+                "INSERT INTO %s /*+ 
OPTIONS('sink.use-managed-memory-allocator'='true', 
'sink.managed.writer-buffer-memory'='1M') */ "
+                        + "VALUES ('1', '2', '3'), ('4', '5', '6')",
+                "T1");
+        assertThat(batchSql("SELECT * FROM T1").size()).isEqualTo(2);
+    }
+}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml 
b/paimon-flink/paimon-flink-1.19/pom.xml
similarity index 95%
copy from paimon-flink/paimon-flink-1.18/pom.xml
copy to paimon-flink/paimon-flink-1.19/pom.xml
index a0f83fa0d..60a33df24 100644
--- a/paimon-flink/paimon-flink-1.18/pom.xml
+++ b/paimon-flink/paimon-flink-1.19/pom.xml
@@ -30,11 +30,11 @@ under the License.
 
     <packaging>jar</packaging>
 
-    <artifactId>paimon-flink-1.18</artifactId>
-    <name>Paimon : Flink : 1.18</name>
+    <artifactId>paimon-flink-1.19</artifactId>
+    <name>Paimon : Flink : 1.19</name>
 
     <properties>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.19.0</flink.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index c0156e841..9cba78f21 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -131,8 +131,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
                                         createCommitterFactory(),
                                         createCommittableStateManager()))
                         .setParallelism(input.getParallelism());
-        configureGlobalCommitter(
-                committed, commitCpuCores, commitHeapMemory, 
env.getConfiguration());
+        configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index efca0b1bd..eea8b9b58 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>Paimon : Flink : Common</name>
 
     <properties>
-        <flink.version>1.18.1</flink.version>
+        <flink.version>1.19.0</flink.version>
     </properties>
 
     <dependencies>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 582fcfc35..97c426ee5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -52,7 +52,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 
-import static 
org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -261,27 +260,18 @@ public abstract class FlinkSink<T> implements 
Serializable {
                         .setMaxParallelism(1);
         Options options = Options.fromMap(table.options());
         configureGlobalCommitter(
-                committed,
-                options.get(SINK_COMMITTER_CPU),
-                options.get(SINK_COMMITTER_MEMORY),
-                conf);
+                committed, options.get(SINK_COMMITTER_CPU), 
options.get(SINK_COMMITTER_MEMORY));
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
     public static void configureGlobalCommitter(
             SingleOutputStreamOperator<?> committed,
             double cpuCores,
-            @Nullable MemorySize heapMemory,
-            ReadableConfig conf) {
+            @Nullable MemorySize heapMemory) {
         if (heapMemory == null) {
             return;
         }
 
-        if (!conf.get(ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT)) {
-            throw new RuntimeException(
-                    "To support the 'sink.committer-cpu' and 
'sink.committer-memory' configurations, you must enable fine-grained resource 
management. Please set 'cluster.fine-grained-resource-management.enabled' to 
'true' in your Flink configuration.");
-        }
-
         SlotSharingGroup slotSharingGroup =
                 SlotSharingGroup.newBuilder(committed.getName())
                         .setCpuCores(cpuCores)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index a5c8e5557..b61fecab5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.log.LogWriteCallback;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
@@ -96,7 +97,12 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
 
         this.sinkContext = new SimpleContext(getProcessingTimeService());
         if (logSinkFunction != null) {
-            FunctionUtils.openFunction(logSinkFunction, new Configuration());
+            // to stay compatible with Flink 1.18-
+            if (logSinkFunction instanceof RichFunction) {
+                RichFunction richFunction = (RichFunction) logSinkFunction;
+                richFunction.open(new Configuration());
+            }
+
             logCallback = new LogWriteCallback();
             logSinkFunction.setWriteCallback(logCallback);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
index a51d9e02e..82964e41c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/ManagedMemoryUtils.java
@@ -43,6 +43,7 @@ public class ManagedMemoryUtils {
                         operator.getOperatorConfig()
                                 .getManagedMemoryFractionOperatorUseCaseOfSlot(
                                         ManagedMemoryUseCase.OPERATOR,
+                                        environment.getJobConfiguration(),
                                         
environment.getTaskManagerInfo().getConfiguration(),
                                         
environment.getUserCodeClassLoader().asClassLoader()));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
index 841a24ba6..a559d3350 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RescaleBucketITCase.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
@@ -80,13 +81,11 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
                         + "INSERT INTO `T4` SELECT * FROM `S0`;\n"
                         + "END";
 
-        
sEnv.getConfig().getConfiguration().set(SavepointConfigOptions.SAVEPOINT_PATH, 
path);
-
         // step1: run streaming insert
         JobClient jobClient = startJobAndCommitSnapshot(streamSql, null);
 
         // step2: stop with savepoint
-        stopJobSafely(jobClient);
+        String savepointPath = stopJobSafely(jobClient);
 
         final Snapshot snapshotBeforeRescale = findLatestSnapshot("T3");
         assertThat(snapshotBeforeRescale).isNotNull();
@@ -107,6 +106,9 @@ public class RescaleBucketITCase extends CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM 
T3")).containsExactlyInAnyOrderElementsOf(committedData);
 
         // step5: resume streaming job
+        sEnv.getConfig()
+                .getConfiguration()
+                .set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
         JobClient resumedJobClient =
                 startJobAndCommitSnapshot(streamSql, 
snapshotAfterRescale.id());
         // stop job
@@ -144,11 +146,13 @@ public class RescaleBucketITCase extends 
CatalogITCaseBase {
         return jobClient;
     }
 
-    private void stopJobSafely(JobClient client) throws ExecutionException, 
InterruptedException {
-        client.stopWithSavepoint(true, path, SavepointFormatType.DEFAULT);
+    private String stopJobSafely(JobClient client) throws ExecutionException, 
InterruptedException {
+        CompletableFuture<String> savepointPath =
+                client.stopWithSavepoint(true, path, 
SavepointFormatType.DEFAULT);
         while (!client.getJobStatus().get().isGloballyTerminalState()) {
             Thread.sleep(2000L);
         }
+        return savepointPath.get();
     }
 
     private void assertLatestSchema(
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index 30ef22d8c..caeb2fcea 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
         <module>paimon-flink-1.16</module>
         <module>paimon-flink-1.17</module>
         <module>paimon-flink-1.18</module>
+        <module>paimon-flink-1.19</module>
         <module>paimon-flink-action</module>
         <module>paimon-flink-cdc</module>
     </modules>
@@ -95,6 +96,10 @@ under the License.
                     <groupId>com.google.protobuf</groupId>
                     <artifactId>protobuf-java</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -162,6 +167,7 @@ under the License.
                 <module>paimon-flink-1.16</module>
                 <module>paimon-flink-1.17</module>
                 <module>paimon-flink-1.18</module>
+                <module>paimon-flink-1.19</module>
                 <module>paimon-flink-cdc</module>
             </modules>
             <build>
diff --git a/paimon-hive/paimon-hive-catalog/pom.xml 
b/paimon-hive/paimon-hive-catalog/pom.xml
index b50e58563..f4c9a810b 100644
--- a/paimon-hive/paimon-hive-catalog/pom.xml
+++ b/paimon-hive/paimon-hive-catalog/pom.xml
@@ -87,31 +87,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml 
b/paimon-hive/paimon-hive-connector-2.3/pom.xml
index 0f4733f00..8f2005e56 100644
--- a/paimon-hive/paimon-hive-connector-2.3/pom.xml
+++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml
@@ -127,31 +127,6 @@ under the License.
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
@@ -554,6 +529,10 @@ under the License.
                     <groupId>org.apache.calcite</groupId>
                     <artifactId>calcite-avatica</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml 
b/paimon-hive/paimon-hive-connector-3.1/pom.xml
index 1792f032f..d52bc0b0b 100644
--- a/paimon-hive/paimon-hive-connector-3.1/pom.xml
+++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml
@@ -141,31 +141,6 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
@@ -580,6 +555,10 @@ under the License.
                     <groupId>org.apache.calcite</groupId>
                     <artifactId>calcite-avatica</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index a9939854c..7d5f1e551 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -46,35 +46,6 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
@@ -519,6 +490,12 @@ under the License.
             <artifactId>avro</artifactId>
             <version>${avro.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <!--
@@ -574,6 +551,14 @@ under the License.
                     <groupId>org.apache.calcite</groupId>
                     <artifactId>calcite-avatica</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index aacd9087c..668c88f1f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -176,6 +176,10 @@ public abstract class HiveCatalogITCaseBase {
         tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 
'Hello')").await();
         Path tablePath = new Path(path, "test_db2.db/t");
         assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
+        assertThatThrownBy(() -> tEnv.executeSql("DROP DATABASE 
test_db2").await())
+                .hasRootCauseInstanceOf(ValidationException.class)
+                .hasRootCauseMessage("Cannot drop a database which is 
currently in use.");
+        tEnv.executeSql("USE test_db");
         assertThatThrownBy(() -> tEnv.executeSql("DROP DATABASE 
test_db2").await())
                 .hasRootCauseInstanceOf(DatabaseNotEmptyException.class)
                 .hasRootCauseMessage("Database test_db2 in catalog my_hive is 
not empty.");
diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml
index 8e1273daa..531591b69 100644
--- a/paimon-hive/pom.xml
+++ b/paimon-hive/pom.xml
@@ -52,6 +52,47 @@ under the License.
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-s3</artifactId>
@@ -81,7 +122,6 @@ under the License.
             <scope>test</scope>
         </dependency>
 
-
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-test-utils</artifactId>
diff --git a/pom.xml b/pom.xml
index 238398e20..6fdda9566 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,8 +101,9 @@ under the License.
         <!-- Can be set to any value to reproduce a specific build. -->
         <test.randomization.seed/>
         <test.unit.pattern>**/*Test.*</test.unit.pattern>
-        <test.flink.main.version>1.18</test.flink.main.version>
-        <test.flink.version>1.18.1</test.flink.version>
+        <test.flink.main.version>1.19</test.flink.main.version>
+        <test.flink.version>1.19.0</test.flink.version>
+        <!-- TODO upgrade after connector releases x-1.19 version -->
         
<test.flink.connector.kafka.version>3.0.1-1.18</test.flink.connector.kafka.version>
 
         <janino.version>3.0.11</janino.version>

Reply via email to