This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit b3ebcf109a3e94727051908995e3364fa2b89039
Author: Rodrigo Meneses <[email protected]>
AuthorDate: Mon Apr 15 10:03:00 2024 -0700

    Flink: Refactoring code and properties to make Flink 1.19 to work
---
 .github/workflows/flink-ci.yml                     |  2 +-
 dev/stage-binaries.sh                              |  2 +-
 flink/build.gradle                                 |  7 +++--
 flink/v1.19/build.gradle                           | 36 +++++++++++-----------
 .../apache/iceberg/flink/FlinkCatalogFactory.java  |  2 +-
 .../org/apache/iceberg/flink/FlinkTestBase.java    | 18 +++++++++++
 .../java/org/apache/iceberg/flink/TestBase.java    | 20 +++++++++++-
 .../apache/iceberg/flink/TestChangeLogTable.java   |  2 +-
 .../iceberg/flink/TestFlinkCatalogDatabase.java    |  8 ++---
 .../iceberg/flink/TestFlinkCatalogTable.java       |  2 +-
 .../flink/TestFlinkCatalogTablePartitions.java     |  2 +-
 .../apache/iceberg/flink/TestFlinkHiveCatalog.java |  2 +-
 .../apache/iceberg/flink/TestFlinkTableSink.java   |  2 +-
 .../org/apache/iceberg/flink/TestFlinkUpsert.java  |  2 +-
 .../apache/iceberg/flink/TestIcebergConnector.java | 20 ++++--------
 .../flink/actions/TestRewriteDataFilesAction.java  |  2 +-
 .../sink/shuffle/TestDataStatisticsOperator.java   |  4 ++-
 .../flink/source/TestFlinkMetaDataTable.java       |  2 +-
 .../iceberg/flink/source/TestFlinkTableSource.java |  2 +-
 .../source/TestMetadataTableReadableMetrics.java   |  2 +-
 .../iceberg/flink/source/TestStreamScanSql.java    |  2 +-
 .../iceberg/flink/util/TestFlinkPackage.java       |  2 +-
 gradle.properties                                  |  4 +--
 gradle/libs.versions.toml                          | 24 +++++++--------
 settings.gradle                                    | 18 +++++------
 25 files changed, 110 insertions(+), 79 deletions(-)

diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml
index af1c650f30..0791f5b733 100644
--- a/.github/workflows/flink-ci.yml
+++ b/.github/workflows/flink-ci.yml
@@ -72,7 +72,7 @@ jobs:
     strategy:
       matrix:
         jvm: [8, 11]
-        flink: ['1.16', '1.17', '1.18']
+        flink: ['1.17', '1.18', '1.19']
     env:
       SPARK_LOCAL_IP: localhost
     steps:
diff --git a/dev/stage-binaries.sh b/dev/stage-binaries.sh
index b7cd1a37ac..05bf3c4253 100755
--- a/dev/stage-binaries.sh
+++ b/dev/stage-binaries.sh
@@ -19,7 +19,7 @@
 #
 
 SCALA_VERSION=2.12
-FLINK_VERSIONS=1.16,1.17,1.18
+FLINK_VERSIONS=1.17,1.18,1.19
 SPARK_VERSIONS=3.3,3.4,3.5
 HIVE_VERSIONS=2,3
 
diff --git a/flink/build.gradle b/flink/build.gradle
index a33fc84e57..f049ff69b0 100644
--- a/flink/build.gradle
+++ b/flink/build.gradle
@@ -19,9 +19,6 @@
 
 def flinkVersions = (System.getProperty("flinkVersions") != null ? 
System.getProperty("flinkVersions") : 
System.getProperty("defaultFlinkVersions")).split(",")
 
-if (flinkVersions.contains("1.16")) {
-  apply from: file("$projectDir/v1.16/build.gradle")
-}
 
 if (flinkVersions.contains("1.17")) {
   apply from: file("$projectDir/v1.17/build.gradle")
@@ -30,3 +27,7 @@ if (flinkVersions.contains("1.17")) {
 if (flinkVersions.contains("1.18")) {
   apply from: file("$projectDir/v1.18/build.gradle")
 }
+
+if (flinkVersions.contains("1.19")) {
+  apply from: file("$projectDir/v1.19/build.gradle")
+}
diff --git a/flink/v1.19/build.gradle b/flink/v1.19/build.gradle
index c08ae5d8cc..f70c7a4d32 100644
--- a/flink/v1.19/build.gradle
+++ b/flink/v1.19/build.gradle
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-String flinkMajorVersion = '1.18'
+String flinkMajorVersion = '1.19'
 String scalaVersion = System.getProperty("scalaVersion") != null ? 
System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
 
 project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
@@ -32,15 +32,15 @@ 
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     implementation project(':iceberg-parquet')
     implementation project(':iceberg-hive-metastore')
 
-    compileOnly libs.flink118.avro
+    compileOnly libs.flink119.avro
     // for dropwizard histogram metrics implementation
-    compileOnly libs.flink118.metrics.dropwizard
-    compileOnly libs.flink118.streaming.java
-    compileOnly 
"${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests"
-    compileOnly libs.flink118.table.api.java.bridge
-    compileOnly 
"org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
-    compileOnly libs.flink118.connector.base
-    compileOnly libs.flink118.connector.files
+    compileOnly libs.flink119.metrics.dropwizard
+    compileOnly libs.flink119.streaming.java
+    compileOnly 
"${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests"
+    compileOnly libs.flink119.table.api.java.bridge
+    compileOnly 
"org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
+    compileOnly libs.flink119.connector.base
+    compileOnly libs.flink119.connector.files
 
     compileOnly libs.hadoop2.hdfs
     compileOnly libs.hadoop2.common
@@ -66,13 +66,13 @@ 
project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
       exclude group: 'org.slf4j'
     }
 
-    testImplementation libs.flink118.connector.test.utils
-    testImplementation libs.flink118.core
-    testImplementation libs.flink118.runtime
-    testImplementation(libs.flink118.test.utilsjunit) {
+    testImplementation libs.flink119.connector.test.utils
+    testImplementation libs.flink119.core
+    testImplementation libs.flink119.runtime
+    testImplementation(libs.flink119.test.utilsjunit) {
       exclude group: 'junit'
     }
-    testImplementation(libs.flink118.test.utils) {
+    testImplementation(libs.flink119.test.utils) {
       exclude group: "org.apache.curator", module: 'curator-test'
       exclude group: 'junit'
     }
@@ -166,7 +166,7 @@ 
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
     }
 
     // for dropwizard histogram metrics implementation
-    implementation libs.flink118.metrics.dropwizard
+    implementation libs.flink119.metrics.dropwizard
 
     // for integration testing with the flink-runtime-jar
     // all of those dependencies are required because the integration test 
extends FlinkTestBase
@@ -176,13 +176,13 @@ 
project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
     integrationImplementation project(path: 
":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: 
"testArtifacts")
     integrationImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     integrationImplementation project(path: ':iceberg-hive-metastore', 
configuration: 'testArtifacts')
-    integrationImplementation(libs.flink118.test.utils) {
+    integrationImplementation(libs.flink119.test.utils) {
       exclude group: "org.apache.curator", module: 'curator-test'
       exclude group: 'junit'
     }
 
-    integrationImplementation libs.flink118.table.api.java.bridge
-    integrationImplementation 
"org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
+    integrationImplementation libs.flink119.table.api.java.bridge
+    integrationImplementation 
"org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
 
     integrationImplementation libs.hadoop2.common
     integrationImplementation libs.hadoop2.hdfs
diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 1453753849..fe4008a13c 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -70,8 +70,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
+  public static final String DEFAULT_CATALOG_NAME = "default_catalog";
   public static final String BASE_NAMESPACE = "base-namespace";
-
   public static final String TYPE = "type";
   public static final String PROPERTY_VERSION = "property-version";
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 8076e0ec76..0b7d19f27c 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static 
org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
+
 import java.util.List;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
@@ -126,4 +128,20 @@ public abstract class FlinkTestBase extends TestBaseUtils {
     sql("USE CATALOG default_catalog");
     sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
   }
+
+  /**
+   * We can not drop currently used database after FLINK-33226, so we have 
make sure that we do not
+   * use the current database before dropping it. This method switches to the 
default database in
+   * the default catalog, and then it and drops the one requested.
+   *
+   * @param database The database to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the 
database
+   */
+  protected void dropDatabase(String database, boolean ifExists) {
+    String currentCatalog = getTableEnv().getCurrentCatalog();
+    sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
+    sql("USE %s", getTableEnv().listDatabases()[0]);
+    sql("USE CATALOG %s", currentCatalog);
+    sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 3986f1a796..e0b429b31b 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.flink;
 
+import static 
org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
+
 import java.nio.file.Path;
 import java.util.List;
 import org.apache.flink.table.api.EnvironmentSettings;
@@ -124,7 +126,23 @@ public abstract class TestBase extends TestBaseUtils {
    * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
    */
   protected void dropCatalog(String catalogName, boolean ifExists) {
-    sql("USE CATALOG default_catalog");
+    sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
     sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
   }
+
+  /**
+   * We can not drop currently used database after FLINK-33226, so we have 
make sure that we do not
+   * use the current database before dropping it. This method switches to the 
default database in
+   * the default catalog, and then it and drops the one requested.
+   *
+   * @param database The database to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the 
database
+   */
+  protected void dropDatabase(String database, boolean ifExists) {
+    String currentCatalog = getTableEnv().getCurrentCatalog();
+    sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
+    sql("USE %s", getTableEnv().listDatabases()[0]);
+    sql("USE CATALOG %s", currentCatalog);
+    sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 8e9066e391..d9f01796fb 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -98,7 +98,7 @@ public class TestChangeLogTable extends 
ChangeLogTableTestBase {
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);
     dropCatalog(CATALOG_NAME, true);
     BoundedTableFactory.clearDataSets();
   }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
index f46d50a5f0..3837996704 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java
@@ -41,7 +41,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase 
{
   @Override
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
@@ -61,7 +61,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase 
{
         .as("Database should still exist")
         .isTrue();
 
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
         .as("Database should be dropped")
         .isFalse();
@@ -81,7 +81,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase 
{
     assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
         .as("Namespace should exist")
         .isTrue();
-    sql("DROP DATABASE %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
         .as("Namespace should have been dropped")
         .isFalse();
@@ -105,7 +105,7 @@ public class TestFlinkCatalogDatabase extends 
CatalogTestBase {
     
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, 
"tl")))
         .as("Table should exist")
         .isTrue();
-    Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
+    Assertions.assertThatThrownBy(() -> dropDatabase(flinkDatabase, true))
         .cause()
         .isInstanceOf(DatabaseNotEmptyException.class)
         .hasMessage(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index eaa92e32c4..f3af2c3cbe 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -74,7 +74,7 @@ public class TestFlinkCatalogTable extends CatalogTestBase {
   public void cleanNamespaces() {
     sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
     sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index b32be379ca..a6feb26077 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -76,7 +76,7 @@ public class TestFlinkCatalogTablePartitions extends 
CatalogTestBase {
   @AfterEach
   public void cleanNamespaces() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
index 47ee2afceb..dd64352013 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java
@@ -97,7 +97,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase {
         "Should have a .crc file and a .parquet file", 2, 
Files.list(dataPath).count());
 
     sql("DROP TABLE test_table");
-    sql("DROP DATABASE test_db");
+    dropDatabase("test_db", false);
     dropCatalog("test_catalog", false);
   }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index b7fce104f4..3f66174049 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -122,7 +122,7 @@ public class TestFlinkTableSink extends CatalogTestBase {
   @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     BoundedTableFactory.clearDataSets();
     super.clean();
   }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index 5674c83e40..baf13017ff 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -104,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
   @Override
   @AfterEach
   public void clean() {
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index cb409b7843..632997e357 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -257,20 +257,12 @@ public class TestIcebergConnector extends FlinkTestBase {
   public void testCatalogDatabaseConflictWithFlinkDatabase() {
     sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName());
     sql("USE `%s`", databaseName());
-
-    try {
-      testCreateConnectorTable();
-      // Ensure that the table was created under the specific database.
-      Assertions.assertThatThrownBy(
-              () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", 
databaseName(), TABLE_NAME))
-          .isInstanceOf(org.apache.flink.table.api.TableException.class)
-          .hasMessageStartingWith("Could not execute CreateTable in path");
-    } finally {
-      sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
-      if (!isDefaultDatabaseName()) {
-        sql("DROP DATABASE `%s`", databaseName());
-      }
-    }
+    testCreateConnectorTable();
+    // Ensure that the table was created under the specific database.
+    Assertions.assertThatThrownBy(
+            () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", 
databaseName(), TABLE_NAME))
+        .isInstanceOf(org.apache.flink.table.api.TableException.class)
+        .hasMessageStartingWith("Could not execute CreateTable in path");
   }
 
   @Test
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
index 4220775f41..5d8cce0733 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends 
CatalogTestBase {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_WITH_PK);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
index 0e99a2d74c..8ddb147e07 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
@@ -234,7 +235,8 @@ public class TestDataStatisticsOperator {
     CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
     OperatorStateStore operatorStateStore =
         abstractStateBackend.createOperatorStateBackend(
-            env, "test-operator", Collections.emptyList(), 
cancelStreamRegistry);
+            new OperatorStateBackendParametersImpl(
+                env, "test-operator", Collections.emptyList(), 
cancelStreamRegistry));
     return new StateInitializationContextImpl(null, operatorStateStore, null, 
null, null);
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
index f58cc87c6a..8352924d04 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java
@@ -126,7 +126,7 @@ public class TestFlinkMetaDataTable extends CatalogTestBase 
{
   @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
index ff14bc4062..e8ec482520 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java
@@ -95,7 +95,7 @@ public class TestFlinkTableSource extends FlinkTestBase {
   @After
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
+    dropDatabase(DATABASE_NAME, true);
     dropCatalog(CATALOG_NAME, true);
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
index 40dfda7237..9cf953342a 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java
@@ -168,7 +168,7 @@ public class TestMetadataTableReadableMetrics extends 
CatalogTestBase {
   @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
index 9e043bbbbb..d869279997 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java
@@ -94,7 +94,7 @@ public class TestStreamScanSql extends CatalogTestBase {
   @AfterEach
   public void clean() {
     sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
-    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    dropDatabase(flinkDatabase, true);
     super.clean();
   }
 
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
index 08bbc4fc80..4ba4f9d983 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java
@@ -29,7 +29,7 @@ public class TestFlinkPackage {
   /** This unit test would need to be adjusted as new Flink version is 
supported. */
   @Test
   public void testVersion() {
-    assertThat(FlinkPackage.version()).isEqualTo("1.18.1");
+    assertThat(FlinkPackage.version()).isEqualTo("1.19.0");
   }
 
   @Test
diff --git a/gradle.properties b/gradle.properties
index ea857e7f27..f593e700fc 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,8 +16,8 @@
 jmhOutputPath=build/reports/jmh/human-readable-output.txt
 jmhJsonOutputPath=build/reports/jmh/results.json
 jmhIncludeRegex=.*
-systemProp.defaultFlinkVersions=1.18
-systemProp.knownFlinkVersions=1.16,1.17,1.18
+systemProp.defaultFlinkVersions=1.19
+systemProp.knownFlinkVersions=1.17,1.18,1.19
 systemProp.defaultHiveVersions=2
 systemProp.knownHiveVersions=2,3
 systemProp.defaultSparkVersions=3.5
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 23628d26fd..3f2f2591e3 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -38,9 +38,9 @@ delta-spark = "3.1.0"
 esotericsoftware-kryo = "4.0.3"
 errorprone-annotations = "2.26.1"
 findbugs-jsr305 = "3.0.2"
-flink116 = { strictly = "1.16.3"}
 flink117 = { strictly = "1.17.2"}
 flink118 =  { strictly = "1.18.1"}
+flink119 =  { strictly = "1.19.0"}
 google-libraries-bom = "26.28.0"
 guava = "33.1.0-jre"
 hadoop2 = "2.7.3"
@@ -105,12 +105,6 @@ calcite-druid = { module = 
"org.apache.calcite:calcite-druid", version.ref = "ca
 delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = 
"delta-standalone" }
 errorprone-annotations = { module = 
"com.google.errorprone:error_prone_annotations", version.ref = 
"errorprone-annotations" }
 findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = 
"findbugs-jsr305" }
-flink116-avro = { module = "org.apache.flink:flink-avro", version.ref = 
"flink116" }
-flink116-connector-base = { module = "org.apache.flink:flink-connector-base", 
version.ref = "flink116" }
-flink116-connector-files = { module = 
"org.apache.flink:flink-connector-files", version.ref = "flink116" }
-flink116-metrics-dropwizard = { module = 
"org.apache.flink:flink-metrics-dropwizard", version.ref = "flink116" }
-flink116-streaming-java = { module = "org.apache.flink:flink-streaming-java", 
version.ref = "flink116" }
-flink116-table-api-java-bridge = { module = 
"org.apache.flink:flink-table-api-java-bridge", version.ref = "flink116" }
 flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = 
"flink117" }
 flink117-connector-base = { module = "org.apache.flink:flink-connector-base", 
version.ref = "flink117" }
 flink117-connector-files = { module = 
"org.apache.flink:flink-connector-files", version.ref = "flink117" }
@@ -123,6 +117,12 @@ flink118-connector-files = { module = 
"org.apache.flink:flink-connector-files",
 flink118-metrics-dropwizard = { module = 
"org.apache.flink:flink-metrics-dropwizard", version.ref = "flink118" }
 flink118-streaming-java = { module = "org.apache.flink:flink-streaming-java", 
version.ref = "flink118" }
 flink118-table-api-java-bridge = { module = 
"org.apache.flink:flink-table-api-java-bridge", version.ref = "flink118" }
+flink119-avro = { module = "org.apache.flink:flink-avro", version.ref = 
"flink119" }
+flink119-connector-base = { module = "org.apache.flink:flink-connector-base", 
version.ref = "flink119" }
+flink119-connector-files = { module = 
"org.apache.flink:flink-connector-files", version.ref = "flink119" }
+flink119-metrics-dropwizard = { module = 
"org.apache.flink:flink-metrics-dropwizard", version.ref = "flink119" }
+flink119-streaming-java = { module = "org.apache.flink:flink-streaming-java", 
version.ref = "flink119" }
+flink119-table-api-java-bridge = { module = 
"org.apache.flink:flink-table-api-java-bridge", version.ref = "flink119" }
 google-libraries-bom = { module = "com.google.cloud:libraries-bom", 
version.ref = "google-libraries-bom" }
 guava-guava = { module = "com.google.guava:guava", version.ref = "guava" }
 hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = 
"hadoop2" }
@@ -175,11 +175,6 @@ assertj-core = { module = "org.assertj:assertj-core", 
version.ref = "assertj-cor
 awaitility = { module = "org.awaitility:awaitility", version.ref = 
"awaitility" }
 delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = 
"delta-spark" }
 esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = 
"esotericsoftware-kryo" }
-flink116-connector-test-utils = { module = 
"org.apache.flink:flink-connector-test-utils", version.ref = "flink116" }
-flink116-core = { module = "org.apache.flink:flink-core", version.ref = 
"flink116" }
-flink116-runtime = { module = "org.apache.flink:flink-runtime", version.ref = 
"flink116" }
-flink116-test-utils = { module = "org.apache.flink:flink-test-utils", 
version.ref = "flink116" }
-flink116-test-utilsjunit = { module = 
"org.apache.flink:flink-test-utils-junit", version.ref = "flink116" }
 flink117-connector-test-utils = { module = 
"org.apache.flink:flink-connector-test-utils", version.ref = "flink117" }
 flink117-core = { module = "org.apache.flink:flink-core", version.ref = 
"flink117" }
 flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = 
"flink117" }
@@ -190,6 +185,11 @@ flink118-core = { module = "org.apache.flink:flink-core", 
version.ref = "flink11
 flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = 
"flink118" }
 flink118-test-utils = { module = "org.apache.flink:flink-test-utils", 
version.ref = "flink118" }
 flink118-test-utilsjunit = { module = 
"org.apache.flink:flink-test-utils-junit", version.ref = "flink118" }
+flink119-connector-test-utils = { module = 
"org.apache.flink:flink-connector-test-utils", version.ref = "flink119" }
+flink119-core = { module = "org.apache.flink:flink-core", version.ref = 
"flink119" }
+flink119-runtime = { module = "org.apache.flink:flink-runtime", version.ref = 
"flink119" }
+flink119-test-utils = { module = "org.apache.flink:flink-test-utils", 
version.ref = "flink119" }
+flink119-test-utilsjunit = { module = 
"org.apache.flink:flink-test-utils-junit", version.ref = "flink119" }
 guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = 
"guava" }
 jakarta-el-api = { module = "jakarta.el:jakarta.el-api", version.ref = 
"jakarta-el-api" }
 jetty-server = { module = "org.eclipse.jetty:jetty-server", version.ref = 
"jetty" }
diff --git a/settings.gradle b/settings.gradle
index 15bb83754e..4f42d24c32 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -112,15 +112,6 @@ if (!flinkVersions.isEmpty()) {
   project(':flink').name = 'iceberg-flink'
 }
 
-if (flinkVersions.contains("1.16")) {
-  include ":iceberg-flink:flink-1.16"
-  include ":iceberg-flink:flink-runtime-1.16"
-  project(":iceberg-flink:flink-1.16").projectDir = file('flink/v1.16/flink')
-  project(":iceberg-flink:flink-1.16").name = "iceberg-flink-1.16"
-  project(":iceberg-flink:flink-runtime-1.16").projectDir = 
file('flink/v1.16/flink-runtime')
-  project(":iceberg-flink:flink-runtime-1.16").name = 
"iceberg-flink-runtime-1.16"
-}
-
 if (flinkVersions.contains("1.17")) {
   include ":iceberg-flink:flink-1.17"
   include ":iceberg-flink:flink-runtime-1.17"
@@ -139,6 +130,15 @@ if (flinkVersions.contains("1.18")) {
   project(":iceberg-flink:flink-runtime-1.18").name = 
"iceberg-flink-runtime-1.18"
 }
 
+if (flinkVersions.contains("1.19")) {
+  include ":iceberg-flink:flink-1.19"
+  include ":iceberg-flink:flink-runtime-1.19"
+  project(":iceberg-flink:flink-1.19").projectDir = file('flink/v1.19/flink')
+  project(":iceberg-flink:flink-1.19").name = "iceberg-flink-1.19"
+  project(":iceberg-flink:flink-runtime-1.19").projectDir = 
file('flink/v1.19/flink-runtime')
+  project(":iceberg-flink:flink-runtime-1.19").name = 
"iceberg-flink-runtime-1.19"
+}
+
 if (sparkVersions.contains("3.3")) {
   include ":iceberg-spark:spark-3.3_${scalaVersion}"
   include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}"

Reply via email to