This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit b3ebcf109a3e94727051908995e3364fa2b89039 Author: Rodrigo Meneses <[email protected]> AuthorDate: Mon Apr 15 10:03:00 2024 -0700 Flink: Refactoring code and properties to make Flink 1.19 to work --- .github/workflows/flink-ci.yml | 2 +- dev/stage-binaries.sh | 2 +- flink/build.gradle | 7 +++-- flink/v1.19/build.gradle | 36 +++++++++++----------- .../apache/iceberg/flink/FlinkCatalogFactory.java | 2 +- .../org/apache/iceberg/flink/FlinkTestBase.java | 18 +++++++++++ .../java/org/apache/iceberg/flink/TestBase.java | 20 +++++++++++- .../apache/iceberg/flink/TestChangeLogTable.java | 2 +- .../iceberg/flink/TestFlinkCatalogDatabase.java | 8 ++--- .../iceberg/flink/TestFlinkCatalogTable.java | 2 +- .../flink/TestFlinkCatalogTablePartitions.java | 2 +- .../apache/iceberg/flink/TestFlinkHiveCatalog.java | 2 +- .../apache/iceberg/flink/TestFlinkTableSink.java | 2 +- .../org/apache/iceberg/flink/TestFlinkUpsert.java | 2 +- .../apache/iceberg/flink/TestIcebergConnector.java | 20 ++++-------- .../flink/actions/TestRewriteDataFilesAction.java | 2 +- .../sink/shuffle/TestDataStatisticsOperator.java | 4 ++- .../flink/source/TestFlinkMetaDataTable.java | 2 +- .../iceberg/flink/source/TestFlinkTableSource.java | 2 +- .../source/TestMetadataTableReadableMetrics.java | 2 +- .../iceberg/flink/source/TestStreamScanSql.java | 2 +- .../iceberg/flink/util/TestFlinkPackage.java | 2 +- gradle.properties | 4 +-- gradle/libs.versions.toml | 24 +++++++-------- settings.gradle | 18 +++++------ 25 files changed, 110 insertions(+), 79 deletions(-) diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml index af1c650f30..0791f5b733 100644 --- a/.github/workflows/flink-ci.yml +++ b/.github/workflows/flink-ci.yml @@ -72,7 +72,7 @@ jobs: strategy: matrix: jvm: [8, 11] - flink: ['1.16', '1.17', '1.18'] + flink: ['1.17', '1.18', '1.19'] env: SPARK_LOCAL_IP: localhost steps: diff --git a/dev/stage-binaries.sh b/dev/stage-binaries.sh index b7cd1a37ac..05bf3c4253 100755 --- a/dev/stage-binaries.sh +++ b/dev/stage-binaries.sh @@ -19,7 +19,7 @@ # SCALA_VERSION=2.12 -FLINK_VERSIONS=1.16,1.17,1.18 +FLINK_VERSIONS=1.17,1.18,1.19 SPARK_VERSIONS=3.3,3.4,3.5 HIVE_VERSIONS=2,3 diff --git a/flink/build.gradle b/flink/build.gradle index a33fc84e57..f049ff69b0 100644 --- a/flink/build.gradle +++ b/flink/build.gradle @@ -19,9 +19,6 @@ def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",") -if (flinkVersions.contains("1.16")) { - apply from: file("$projectDir/v1.16/build.gradle") -} if (flinkVersions.contains("1.17")) { apply from: file("$projectDir/v1.17/build.gradle") @@ -30,3 +27,7 @@ if (flinkVersions.contains("1.17")) { if (flinkVersions.contains("1.18")) { apply from: file("$projectDir/v1.18/build.gradle") } + +if (flinkVersions.contains("1.19")) { + apply from: file("$projectDir/v1.19/build.gradle") +} diff --git a/flink/v1.19/build.gradle b/flink/v1.19/build.gradle index c08ae5d8cc..f70c7a4d32 100644 --- a/flink/v1.19/build.gradle +++ b/flink/v1.19/build.gradle @@ -17,7 +17,7 @@ * under the License. */ -String flinkMajorVersion = '1.18' +String flinkMajorVersion = '1.19' String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-parquet') implementation project(':iceberg-hive-metastore') - compileOnly libs.flink118.avro + compileOnly libs.flink119.avro // for dropwizard histogram metrics implementation - compileOnly libs.flink118.metrics.dropwizard - compileOnly libs.flink118.streaming.java - compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests" - compileOnly libs.flink118.table.api.java.bridge - compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}" - compileOnly libs.flink118.connector.base - compileOnly libs.flink118.connector.files + compileOnly libs.flink119.metrics.dropwizard + compileOnly libs.flink119.streaming.java + compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests" + compileOnly libs.flink119.table.api.java.bridge + compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}" + compileOnly libs.flink119.connector.base + compileOnly libs.flink119.connector.files compileOnly libs.hadoop2.hdfs compileOnly libs.hadoop2.common @@ -66,13 +66,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'org.slf4j' } - testImplementation libs.flink118.connector.test.utils - testImplementation libs.flink118.core - testImplementation libs.flink118.runtime - testImplementation(libs.flink118.test.utilsjunit) { + testImplementation libs.flink119.connector.test.utils + testImplementation libs.flink119.core + testImplementation libs.flink119.runtime + testImplementation(libs.flink119.test.utilsjunit) { exclude group: 'junit' } - testImplementation(libs.flink118.test.utils) { + testImplementation(libs.flink119.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } @@ -166,7 +166,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { } // for dropwizard histogram metrics implementation - implementation libs.flink118.metrics.dropwizard + implementation libs.flink119.metrics.dropwizard // for integration testing with the flink-runtime-jar // all of those dependencies are required because the integration test extends FlinkTestBase @@ -176,13 +176,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts") integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') - integrationImplementation(libs.flink118.test.utils) { + integrationImplementation(libs.flink119.test.utils) { exclude group: "org.apache.curator", module: 'curator-test' exclude group: 'junit' } - integrationImplementation libs.flink118.table.api.java.bridge - integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}" + integrationImplementation libs.flink119.table.api.java.bridge + integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}" integrationImplementation libs.hadoop2.common integrationImplementation libs.hadoop2.hdfs diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java index 1453753849..fe4008a13c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java @@ -70,8 +70,8 @@ public class FlinkCatalogFactory implements CatalogFactory { public static final String HADOOP_CONF_DIR = "hadoop-conf-dir"; public static final String DEFAULT_DATABASE = "default-database"; public static final String DEFAULT_DATABASE_NAME = "default"; + public static final String DEFAULT_CATALOG_NAME = "default_catalog"; public static final String BASE_NAMESPACE = "base-namespace"; - public static final String TYPE = "type"; public static final String PROPERTY_VERSION = "property-version"; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java index 8076e0ec76..0b7d19f27c 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME; + import java.util.List; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -126,4 +128,20 @@ public abstract class FlinkTestBase extends TestBaseUtils { sql("USE CATALOG default_catalog"); sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName); } + + /** + * We can not drop currently used database after FLINK-33226, so we have make sure that we do not + * use the current database before dropping it. This method switches to the default database in + * the default catalog, and then it and drops the one requested. + * + * @param database The database to drop + * @param ifExists If we should use the 'IF EXISTS' when dropping the database + */ + protected void dropDatabase(String database, boolean ifExists) { + String currentCatalog = getTableEnv().getCurrentCatalog(); + sql("USE CATALOG %s", DEFAULT_CATALOG_NAME); + sql("USE %s", getTableEnv().listDatabases()[0]); + sql("USE CATALOG %s", currentCatalog); + sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java index 3986f1a796..e0b429b31b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.flink; +import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME; + import java.nio.file.Path; import java.util.List; import org.apache.flink.table.api.EnvironmentSettings; @@ -124,7 +126,23 @@ public abstract class TestBase extends TestBaseUtils { * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog */ protected void dropCatalog(String catalogName, boolean ifExists) { - sql("USE CATALOG default_catalog"); + sql("USE CATALOG %s", DEFAULT_CATALOG_NAME); sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName); } + + /** + * We can not drop currently used database after FLINK-33226, so we have make sure that we do not + * use the current database before dropping it. This method switches to the default database in + * the default catalog, and then it and drops the one requested. + * + * @param database The database to drop + * @param ifExists If we should use the 'IF EXISTS' when dropping the database + */ + protected void dropDatabase(String database, boolean ifExists) { + String currentCatalog = getTableEnv().getCurrentCatalog(); + sql("USE CATALOG %s", DEFAULT_CATALOG_NAME); + sql("USE %s", getTableEnv().listDatabases()[0]); + sql("USE CATALOG %s", currentCatalog); + sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index 8e9066e391..d9f01796fb 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -98,7 +98,7 @@ public class TestChangeLogTable extends ChangeLogTableTestBase { @Override public void clean() { sql("DROP TABLE IF EXISTS %s", TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME); + dropDatabase(DATABASE_NAME, true); dropCatalog(CATALOG_NAME, true); BoundedTableFactory.clearDataSets(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index f46d50a5f0..3837996704 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -41,7 +41,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase { @Override public void clean() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } @@ -61,7 +61,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase { .as("Database should still exist") .isTrue(); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) .as("Database should be dropped") .isFalse(); @@ -81,7 +81,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase { assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) .as("Namespace should exist") .isTrue(); - sql("DROP DATABASE %s", flinkDatabase); + dropDatabase(flinkDatabase, true); assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace)) .as("Namespace should have been dropped") .isFalse(); @@ -105,7 +105,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase { assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl"))) .as("Table should exist") .isTrue(); - Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase)) + Assertions.assertThatThrownBy(() -> dropDatabase(flinkDatabase, true)) .cause() .isInstanceOf(DatabaseNotEmptyException.class) .hasMessage( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index eaa92e32c4..f3af2c3cbe 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -74,7 +74,7 @@ public class TestFlinkCatalogTable extends CatalogTestBase { public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase); sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java index b32be379ca..a6feb26077 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java @@ -76,7 +76,7 @@ public class TestFlinkCatalogTablePartitions extends CatalogTestBase { @AfterEach public void cleanNamespaces() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java index 47ee2afceb..dd64352013 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java @@ -97,7 +97,7 @@ public class TestFlinkHiveCatalog extends FlinkTestBase { "Should have a .crc file and a .parquet file", 2, Files.list(dataPath).count()); sql("DROP TABLE test_table"); - sql("DROP DATABASE test_db"); + dropDatabase("test_db", false); dropCatalog("test_catalog", false); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index b7fce104f4..3f66174049 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -122,7 +122,7 @@ public class TestFlinkTableSink extends CatalogTestBase { @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); BoundedTableFactory.clearDataSets(); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java index 5674c83e40..baf13017ff 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java @@ -104,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase { @Override @AfterEach public void clean() { - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java index cb409b7843..632997e357 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -257,20 +257,12 @@ public class TestIcebergConnector extends FlinkTestBase { public void testCatalogDatabaseConflictWithFlinkDatabase() { sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName()); sql("USE `%s`", databaseName()); - - try { - testCreateConnectorTable(); - // Ensure that the table was created under the specific database. - Assertions.assertThatThrownBy( - () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)) - .isInstanceOf(org.apache.flink.table.api.TableException.class) - .hasMessageStartingWith("Could not execute CreateTable in path"); - } finally { - sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME); - if (!isDefaultDatabaseName()) { - sql("DROP DATABASE `%s`", databaseName()); - } - } + testCreateConnectorTable(); + // Ensure that the table was created under the specific database. + Assertions.assertThatThrownBy( + () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)) + .isInstanceOf(org.apache.flink.table.api.TableException.class) + .hasMessageStartingWith("Could not execute CreateTable in path"); } @Test diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java index 4220775f41..5d8cce0733 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java @@ -136,7 +136,7 @@ public class TestRewriteDataFilesAction extends CatalogTestBase { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED); sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED); sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_WITH_PK); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index 0e99a2d74c..8ddb147e07 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.TestTaskStateManager; @@ -234,7 +235,8 @@ public class TestDataStatisticsOperator { CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); OperatorStateStore operatorStateStore = abstractStateBackend.createOperatorStateBackend( - env, "test-operator", Collections.emptyList(), cancelStreamRegistry); + new OperatorStateBackendParametersImpl( + env, "test-operator", Collections.emptyList(), cancelStreamRegistry)); return new StateInitializationContextImpl(null, operatorStateStore, null, null, null); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index f58cc87c6a..8352924d04 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -126,7 +126,7 @@ public class TestFlinkMetaDataTable extends CatalogTestBase { @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java index ff14bc4062..e8ec482520 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkTableSource.java @@ -95,7 +95,7 @@ public class TestFlinkTableSource extends FlinkTestBase { @After public void clean() { sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME); + dropDatabase(DATABASE_NAME, true); dropCatalog(CATALOG_NAME, true); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java index 40dfda7237..9cf953342a 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestMetadataTableReadableMetrics.java @@ -168,7 +168,7 @@ public class TestMetadataTableReadableMetrics extends CatalogTestBase { @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java index 9e043bbbbb..d869279997 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamScanSql.java @@ -94,7 +94,7 @@ public class TestStreamScanSql extends CatalogTestBase { @AfterEach public void clean() { sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE); - sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + dropDatabase(flinkDatabase, true); super.clean(); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java index 08bbc4fc80..4ba4f9d983 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/util/TestFlinkPackage.java @@ -29,7 +29,7 @@ public class TestFlinkPackage { /** This unit test would need to be adjusted as new Flink version is supported. */ @Test public void testVersion() { - assertThat(FlinkPackage.version()).isEqualTo("1.18.1"); + assertThat(FlinkPackage.version()).isEqualTo("1.19.0"); } @Test diff --git a/gradle.properties b/gradle.properties index ea857e7f27..f593e700fc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,8 +16,8 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.18 -systemProp.knownFlinkVersions=1.16,1.17,1.18 +systemProp.defaultFlinkVersions=1.19 +systemProp.knownFlinkVersions=1.17,1.18,1.19 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3 systemProp.defaultSparkVersions=3.5 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 23628d26fd..3f2f2591e3 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -38,9 +38,9 @@ delta-spark = "3.1.0" esotericsoftware-kryo = "4.0.3" errorprone-annotations = "2.26.1" findbugs-jsr305 = "3.0.2" -flink116 = { strictly = "1.16.3"} flink117 = { strictly = "1.17.2"} flink118 = { strictly = "1.18.1"} +flink119 = { strictly = "1.19.0"} google-libraries-bom = "26.28.0" guava = "33.1.0-jre" hadoop2 = "2.7.3" @@ -105,12 +105,6 @@ calcite-druid = { module = "org.apache.calcite:calcite-druid", version.ref = "ca delta-standalone = { module = "io.delta:delta-standalone_2.12", version.ref = "delta-standalone" } errorprone-annotations = { module = "com.google.errorprone:error_prone_annotations", version.ref = "errorprone-annotations" } findbugs-jsr305 = { module = "com.google.code.findbugs:jsr305", version.ref = "findbugs-jsr305" } -flink116-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink116" } -flink116-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink116" } -flink116-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink116" } -flink116-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink116" } -flink116-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink116" } -flink116-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink116" } flink117-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink117" } flink117-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink117" } flink117-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink117" } @@ -123,6 +117,12 @@ flink118-connector-files = { module = "org.apache.flink:flink-connector-files", flink118-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink118" } flink118-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink118" } flink118-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink118" } +flink119-avro = { module = "org.apache.flink:flink-avro", version.ref = "flink119" } +flink119-connector-base = { module = "org.apache.flink:flink-connector-base", version.ref = "flink119" } +flink119-connector-files = { module = "org.apache.flink:flink-connector-files", version.ref = "flink119" } +flink119-metrics-dropwizard = { module = "org.apache.flink:flink-metrics-dropwizard", version.ref = "flink119" } +flink119-streaming-java = { module = "org.apache.flink:flink-streaming-java", version.ref = "flink119" } +flink119-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink119" } google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" } guava-guava = { module = "com.google.guava:guava", version.ref = "guava" } hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" } @@ -175,11 +175,6 @@ assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj-cor awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } delta-spark = { module = "io.delta:delta-spark_2.12", version.ref = "delta-spark" } esotericsoftware-kryo = { module = "com.esotericsoftware:kryo", version.ref = "esotericsoftware-kryo" } -flink116-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink116" } -flink116-core = { module = "org.apache.flink:flink-core", version.ref = "flink116" } -flink116-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink116" } -flink116-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink116" } -flink116-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink116" } flink117-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink117" } flink117-core = { module = "org.apache.flink:flink-core", version.ref = "flink117" } flink117-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink117" } @@ -190,6 +185,11 @@ flink118-core = { module = "org.apache.flink:flink-core", version.ref = "flink11 flink118-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink118" } flink118-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink118" } flink118-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink118" } +flink119-connector-test-utils = { module = "org.apache.flink:flink-connector-test-utils", version.ref = "flink119" } +flink119-core = { module = "org.apache.flink:flink-core", version.ref = "flink119" } +flink119-runtime = { module = "org.apache.flink:flink-runtime", version.ref = "flink119" } +flink119-test-utils = { module = "org.apache.flink:flink-test-utils", version.ref = "flink119" } +flink119-test-utilsjunit = { module = "org.apache.flink:flink-test-utils-junit", version.ref = "flink119" } guava-testlib = { module = "com.google.guava:guava-testlib", version.ref = "guava" } jakarta-el-api = { module = "jakarta.el:jakarta.el-api", version.ref = "jakarta-el-api" } jetty-server = { module = "org.eclipse.jetty:jetty-server", version.ref = "jetty" } diff --git a/settings.gradle b/settings.gradle index 15bb83754e..4f42d24c32 100644 --- a/settings.gradle +++ b/settings.gradle @@ -112,15 +112,6 @@ if (!flinkVersions.isEmpty()) { project(':flink').name = 'iceberg-flink' } -if (flinkVersions.contains("1.16")) { - include ":iceberg-flink:flink-1.16" - include ":iceberg-flink:flink-runtime-1.16" - project(":iceberg-flink:flink-1.16").projectDir = file('flink/v1.16/flink') - project(":iceberg-flink:flink-1.16").name = "iceberg-flink-1.16" - project(":iceberg-flink:flink-runtime-1.16").projectDir = file('flink/v1.16/flink-runtime') - project(":iceberg-flink:flink-runtime-1.16").name = "iceberg-flink-runtime-1.16" -} - if (flinkVersions.contains("1.17")) { include ":iceberg-flink:flink-1.17" include ":iceberg-flink:flink-runtime-1.17" @@ -139,6 +130,15 @@ if (flinkVersions.contains("1.18")) { project(":iceberg-flink:flink-runtime-1.18").name = "iceberg-flink-runtime-1.18" } +if (flinkVersions.contains("1.19")) { + include ":iceberg-flink:flink-1.19" + include ":iceberg-flink:flink-runtime-1.19" + project(":iceberg-flink:flink-1.19").projectDir = file('flink/v1.19/flink') + project(":iceberg-flink:flink-1.19").name = "iceberg-flink-1.19" + project(":iceberg-flink:flink-runtime-1.19").projectDir = file('flink/v1.19/flink-runtime') + project(":iceberg-flink:flink-runtime-1.19").name = "iceberg-flink-runtime-1.19" +} + if (sparkVersions.contains("3.3")) { include ":iceberg-spark:spark-3.3_${scalaVersion}" include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}"
