This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new dc7ad71909 Flink: refactor sink tests to reduce the number of
combinations with parameterized tests (#10777)
dc7ad71909 is described below
commit dc7ad7190989d50f3288ea02eb26d527c9f629c6
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Fri Aug 2 08:39:10 2024 -0700
Flink: refactor sink tests to reduce the number of combinations with
parameterized tests (#10777)
---
.../org/apache/iceberg/flink/CatalogTestBase.java | 22 --
.../iceberg/flink/{TestBase.java => SqlBase.java} | 85 ++-----
.../java/org/apache/iceberg/flink/TestBase.java | 4 +-
.../apache/iceberg/flink/TestFlinkTableSink.java | 114 ---------
.../iceberg/flink/TestFlinkTableSinkExtended.java | 244 +++++++++++++++++++
.../apache/iceberg/flink/TestIcebergConnector.java | 4 -
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 270 +--------------------
.../flink/sink/TestFlinkIcebergSinkBase.java | 51 +++-
.../sink/TestFlinkIcebergSinkDistributionMode.java | 180 ++++++++++++++
...Sink.java => TestFlinkIcebergSinkExtended.java} | 209 ++--------------
10 files changed, 521 insertions(+), 662 deletions(-)
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
index 91ed3c4ade..062ff68d5d 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -118,26 +118,4 @@ public abstract class CatalogTestBase extends TestBase {
static String getURI(HiveConf conf) {
return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
}
-
- static String toWithClause(Map<String, String> props) {
- StringBuilder builder = new StringBuilder();
- builder.append("(");
- int propCount = 0;
- for (Map.Entry<String, String> entry : props.entrySet()) {
- if (propCount > 0) {
- builder.append(",");
- }
- builder
- .append("'")
- .append(entry.getKey())
- .append("'")
- .append("=")
- .append("'")
- .append(entry.getValue())
- .append("'");
- propCount++;
- }
- builder.append(")");
- return builder.toString();
- }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
similarity index 63%
copy from flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
copy to flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
index 6336900446..9411ea4f7d 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
@@ -21,73 +21,16 @@ package org.apache.iceberg.flink;
import static
org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
import static org.assertj.core.api.Assertions.assertThat;
-import java.nio.file.Path;
import java.util.List;
-import org.apache.flink.table.api.EnvironmentSettings;
+import java.util.Map;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
-public abstract class TestBase extends TestBaseUtils {
-
- @RegisterExtension
- public static MiniClusterExtension miniClusterExtension =
- MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
- @TempDir protected Path temporaryDirectory;
-
- private static TestHiveMetastore metastore = null;
- protected static HiveConf hiveConf = null;
- protected static HiveCatalog catalog = null;
-
- private volatile TableEnvironment tEnv = null;
-
- @BeforeAll
- public static void startMetastore() {
- TestBase.metastore = new TestHiveMetastore();
- metastore.start();
- TestBase.hiveConf = metastore.hiveConf();
- TestBase.catalog =
- (HiveCatalog)
- CatalogUtil.loadCatalog(
- HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
- }
-
- @AfterAll
- public static void stopMetastore() throws Exception {
- metastore.stop();
- TestBase.catalog = null;
- }
-
- protected TableEnvironment getTableEnv() {
- if (tEnv == null) {
- synchronized (this) {
- if (tEnv == null) {
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
-
- TableEnvironment env = TableEnvironment.create(settings);
- env.getConfig()
- .getConfiguration()
-
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
- tEnv = env;
- }
- }
- }
- return tEnv;
- }
+public abstract class SqlBase {
+ protected abstract TableEnvironment getTableEnv();
protected static TableResult exec(TableEnvironment env, String query,
Object... args) {
return env.executeSql(String.format(query, args));
@@ -142,4 +85,26 @@ public abstract class TestBase extends TestBaseUtils {
sql("USE CATALOG %s", currentCatalog);
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
}
+
+ protected static String toWithClause(Map<String, String> props) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ int propCount = 0;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (propCount > 0) {
+ builder.append(",");
+ }
+ builder
+ .append("'")
+ .append(entry.getKey())
+ .append("'")
+ .append("=")
+ .append("'")
+ .append(entry.getValue())
+ .append("'");
+ propCount++;
+ }
+ builder.append(")");
+ return builder.toString();
+ }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 6336900446..401960c359 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -41,7 +40,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
-public abstract class TestBase extends TestBaseUtils {
+public abstract class TestBase extends SqlBase {
@RegisterExtension
public static MiniClusterExtension miniClusterExtension =
@@ -72,6 +71,7 @@ public abstract class TestBase extends TestBaseUtils {
TestBase.catalog = null;
}
+ @Override
protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index a0341e6834..2978a92945 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,36 +18,21 @@
*/
package org.apache.iceberg.flink;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.source.BoundedTableFactory;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -169,39 +154,6 @@ public class TestFlinkTableSink extends CatalogTestBase {
icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
}
- @TestTemplate
- public void testWriteParallelism() throws Exception {
- List<Row> dataSet =
- IntStream.range(1, 1000)
- .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i,
"bbb"), Row.of(i, "ccc")))
- .flatMap(List::stream)
- .collect(Collectors.toList());
- String dataId =
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
- sql(
- "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
- + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
- SOURCE_TABLE, dataId);
-
- PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl)
getTableEnv()).getPlanner();
- String insertSQL =
- String.format(
- "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT *
FROM %s",
- TABLE_NAME, SOURCE_TABLE);
- ModifyOperation operation = (ModifyOperation)
planner.getParser().parse(insertSQL).get(0);
- Transformation<?> dummySink =
planner.translate(Collections.singletonList(operation)).get(0);
- Transformation<?> committer = dummySink.getInputs().get(0);
- Transformation<?> writer = committer.getInputs().get(0);
-
- assertThat(writer.getParallelism()).as("Should have the expected 1
parallelism.").isEqualTo(1);
- writer
- .getInputs()
- .forEach(
- input ->
- assertThat(input.getParallelism())
- .as("Should have the expected parallelism.")
- .isEqualTo(isStreamingJob ? 2 : 4));
- }
-
@TestTemplate
public void testReplacePartitions() throws Exception {
assumeThat(isStreamingJob)
@@ -289,70 +241,4 @@ public class TestFlinkTableSink extends CatalogTestBase {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
}
}
-
- @TestTemplate
- public void testHashDistributeMode() throws Exception {
- String tableName = "test_hash_distribution_mode";
- Map<String, String> tableProps =
- ImmutableMap.of(
- "write.format.default",
- format.name(),
- TableProperties.WRITE_DISTRIBUTION_MODE,
- DistributionMode.HASH.modeName());
-
- // Initialize a BoundedSource table to precisely emit those rows in only
one checkpoint.
- List<Row> dataSet =
- IntStream.range(1, 1000)
- .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i,
"bbb"), Row.of(i, "ccc")))
- .flatMap(List::stream)
- .collect(Collectors.toList());
- String dataId =
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
- sql(
- "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
- + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
- SOURCE_TABLE, dataId);
-
- assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
- .as("Should have the expected rows in source table.")
- .containsExactlyInAnyOrderElementsOf(dataSet);
-
- sql(
- "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
- tableName, toWithClause(tableProps));
-
- try {
- // Insert data set.
- sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
-
- assertThat(sql("SELECT * FROM %s", tableName))
- .as("Should have the expected rows in sink table.")
- .containsExactlyInAnyOrderElementsOf(dataSet);
-
- // Sometimes we will have more than one checkpoint if we pass the auto
checkpoint interval,
- // thus producing multiple snapshots. Here we assert that each snapshot
has only 1 file per
- // partition.
- Table table =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
- Map<Long, List<DataFile>> snapshotToDataFiles =
SimpleDataUtil.snapshotToDataFiles(table);
- for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
- if (dataFiles.isEmpty()) {
- continue;
- }
-
- assertThat(
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
- .hasSize(1);
- assertThat(
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
- .hasSize(1);
- assertThat(
- SimpleDataUtil.matchingPartitions(
- dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
- .hasSize(1);
- }
- } finally {
- sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
- }
- }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
new file mode 100644
index 0000000000..482cfd110b
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static
org.apache.iceberg.flink.FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * This class tests the more extended features of Flink sink. Extract them
separately since it is
+ * unnecessary to test all the parameters combinations in {@link
TestFlinkTableSink}, like catalog
+ * types, namespaces, file format, streaming/batch. Those combinations explode
exponentially. Each
+ * test method in {@link TestFlinkTableSink} runs 21 combinations, which are
expensive and slow.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkTableSinkExtended extends SqlBase {
+ protected static final String CATALOG = "testhadoop";
+ protected static final String DATABASE = "db";
+ protected static final String TABLE = "tbl";
+
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ private static final String SOURCE_TABLE =
"default_catalog.default_database.bounded_source";
+ private static final String FLINK_DATABASE = CATALOG + "." + DATABASE;
+ private static final Namespace ICEBERG_NAMESPACE = Namespace.of(new String[]
{DATABASE});
+
+ @TempDir protected File warehouseRoot;
+
+ protected HadoopCatalog catalog = null;
+
+ private TableEnvironment tEnv;
+
+ @Parameter protected boolean isStreamingJob;
+
+ @Parameters(name = "isStreamingJob={0}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(new Boolean[] {true}, new Boolean[] {false});
+ }
+
+ protected synchronized TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ EnvironmentSettings.Builder settingsBuilder =
EnvironmentSettings.newInstance();
+ if (isStreamingJob) {
+ settingsBuilder.inStreamingMode();
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
+ tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ } else {
+ settingsBuilder.inBatchMode();
+ tEnv = TableEnvironment.create(settingsBuilder.build());
+ }
+ }
+ return tEnv;
+ }
+
+ @BeforeEach
+ public void before() {
+ String warehouseLocation = "file:" + warehouseRoot.getPath();
+ this.catalog = new HadoopCatalog(new Configuration(), warehouseLocation);
+ Map<String, String> config = Maps.newHashMap();
+ config.put("type", "iceberg");
+ config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
ICEBERG_CATALOG_TYPE_HADOOP);
+ config.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+ sql("CREATE CATALOG %s WITH %s", CATALOG, toWithClause(config));
+
+ sql("CREATE DATABASE %s", FLINK_DATABASE);
+ sql("USE CATALOG %s", CATALOG);
+ sql("USE %s", DATABASE);
+ sql(
+ "CREATE TABLE %s (id int, data varchar) with
('write.format.default'='%s')",
+ TABLE, FileFormat.PARQUET.name());
+ }
+
+ @AfterEach
+ public void clean() throws Exception {
+ sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, TABLE);
+ dropDatabase(FLINK_DATABASE, true);
+ BoundedTableFactory.clearDataSets();
+
+ dropCatalog(CATALOG, true);
+ catalog.close();
+ }
+
+ @TestTemplate
+ public void testWriteParallelism() {
+ List<Row> dataSet =
+ IntStream.range(1, 1000)
+ .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i,
"bbb"), Row.of(i, "ccc")))
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ String dataId =
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+ SOURCE_TABLE, dataId);
+
+ PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl)
getTableEnv()).getPlanner();
+ String insertSQL =
+ String.format(
+ "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT *
FROM %s",
+ TABLE, SOURCE_TABLE);
+ ModifyOperation operation = (ModifyOperation)
planner.getParser().parse(insertSQL).get(0);
+ Transformation<?> dummySink =
planner.translate(Collections.singletonList(operation)).get(0);
+ Transformation<?> committer = dummySink.getInputs().get(0);
+ Transformation<?> writer = committer.getInputs().get(0);
+
+ assertThat(writer.getParallelism()).as("Should have the expected 1
parallelism.").isEqualTo(1);
+ writer
+ .getInputs()
+ .forEach(
+ input ->
+ assertThat(input.getParallelism())
+ .as("Should have the expected parallelism.")
+ .isEqualTo(isStreamingJob ? 2 : 4));
+ }
+
+ @TestTemplate
+ public void testHashDistributeMode() throws Exception {
+ // Initialize a BoundedSource table to precisely emit those rows in only
one checkpoint.
+ List<Row> dataSet =
+ IntStream.range(1, 1000)
+ .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i,
"bbb"), Row.of(i, "ccc")))
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ String dataId =
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+ SOURCE_TABLE, dataId);
+
+ assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+ .as("Should have the expected rows in source table.")
+ .containsExactlyInAnyOrderElementsOf(dataSet);
+
+ Map<String, String> tableProps =
+ ImmutableMap.of(
+ "write.format.default",
+ FileFormat.PARQUET.name(),
+ TableProperties.WRITE_DISTRIBUTION_MODE,
+ DistributionMode.HASH.modeName());
+
+ String tableName = "test_hash_distribution_mode";
+ sql(
+ "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+ tableName, toWithClause(tableProps));
+
+ try {
+ // Insert data set.
+ sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
+
+ assertThat(sql("SELECT * FROM %s", tableName))
+ .as("Should have the expected rows in sink table.")
+ .containsExactlyInAnyOrderElementsOf(dataSet);
+
+ // Sometimes we will have more than one checkpoint if we pass the auto
checkpoint interval,
+ // thus producing multiple snapshots. Here we assert that each snapshot
has only 1 file per
+ // partition.
+ Table table = catalog.loadTable(TableIdentifier.of(ICEBERG_NAMESPACE,
tableName));
+ Map<Long, List<DataFile>> snapshotToDataFiles =
SimpleDataUtil.snapshotToDataFiles(table);
+ for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
+ if (dataFiles.isEmpty()) {
+ continue;
+ }
+
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
+ .hasSize(1);
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
+ .hasSize(1);
+ assertThat(
+ SimpleDataUtil.matchingPartitions(
+ dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
+ .hasSize(1);
+ }
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
+ }
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index fdb0e0cf19..47f5485df8 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -319,10 +319,6 @@ public class TestIcebergConnector extends TestBase {
return properties.getOrDefault("catalog-database", "default_database");
}
- private String toWithClause(Map<String, String> props) {
- return CatalogTestBase.toWithClause(props);
- }
-
private String createWarehouse() {
try {
return String.format(
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 61ab087f2c..b778037c55 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -18,20 +18,11 @@
*/
package org.apache.iceberg.flink.sink;
-import static org.apache.iceberg.flink.TestFixtures.DATABASE;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
@@ -39,37 +30,19 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
@ExtendWith(ParameterizedTestExtension.class)
public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
-
- @RegisterExtension
- public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
- MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
- @RegisterExtension
- private static final HadoopCatalogExtension CATALOG_EXTENSION =
- new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
-
- private TableLoader tableLoader;
-
@Parameter(index = 0)
private FileFormat format;
@@ -99,7 +72,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
@BeforeEach
public void before() throws IOException {
- table =
+ this.table =
CATALOG_EXTENSION
.catalog()
.createTable(
@@ -110,14 +83,14 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
: PartitionSpec.unpartitioned(),
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
format.name()));
- env =
+ this.env =
StreamExecutionEnvironment.getExecutionEnvironment(
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
- tableLoader = CATALOG_EXTENSION.tableLoader();
+ this.tableLoader = CATALOG_EXTENSION.tableLoader();
}
@TestTemplate
@@ -140,246 +113,13 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
}
- private void testWriteRow(TableSchema tableSchema, DistributionMode
distributionMode)
- throws Exception {
- List<Row> rows = createRows("");
- DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .writeParallelism(parallelism)
- .distributionMode(distributionMode)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
-
- SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
- }
-
- private int partitionFiles(String partition) throws IOException {
- return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
partition)).size();
- }
-
@TestTemplate
public void testWriteRow() throws Exception {
- testWriteRow(null, DistributionMode.NONE);
+ testWriteRow(parallelism, null, DistributionMode.NONE);
}
@TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
- testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
- }
-
- @TestTemplate
- public void testJobNoneDistributeMode() throws Exception {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- testWriteRow(null, DistributionMode.NONE);
-
- if (parallelism > 1) {
- if (partitioned) {
- int files = partitionFiles("aaa") + partitionFiles("bbb") +
partitionFiles("ccc");
- assertThat(files).isGreaterThan(3);
- }
- }
- }
-
- @TestTemplate
- public void testJobHashDistributionMode() {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Flink does not support 'range' write distribution mode
now.");
- }
-
- @TestTemplate
- public void testJobNullDistributionMode() throws Exception {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- testWriteRow(null, null);
-
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
- }
-
- @TestTemplate
- public void testPartitionWriteMode() throws Exception {
- testWriteRow(null, DistributionMode.HASH);
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
- }
-
- @TestTemplate
- public void testShuffleByPartitionWithSchema() throws Exception {
- testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
- }
-
- @TestTemplate
- public void testTwoSinksInDisjointedDAG() throws Exception {
- Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-
- Table leftTable =
- CATALOG_EXTENSION
- .catalog()
- .createTable(
- TableIdentifier.of("left"),
- SimpleDataUtil.SCHEMA,
- partitioned
- ?
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
- : PartitionSpec.unpartitioned(),
- props);
- TableLoader leftTableLoader =
- TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(),
TableIdentifier.of("left"));
-
- Table rightTable =
- CATALOG_EXTENSION
- .catalog()
- .createTable(
- TableIdentifier.of("right"),
- SimpleDataUtil.SCHEMA,
- partitioned
- ?
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
- : PartitionSpec.unpartitioned(),
- props);
- TableLoader rightTableLoader =
- TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(),
TableIdentifier.of("right"));
-
- env =
- StreamExecutionEnvironment.getExecutionEnvironment(
- MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
- .enableCheckpointing(100)
- .setParallelism(parallelism)
- .setMaxParallelism(parallelism);
- env.getConfig().disableAutoGeneratedUIDs();
-
- List<Row> leftRows = createRows("left-");
- DataStream<Row> leftStream =
- env.fromCollection(leftRows, ROW_TYPE_INFO)
- .name("leftCustomSource")
- .uid("leftCustomSource");
- FlinkSink.forRow(leftStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(leftTable)
- .tableLoader(leftTableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .distributionMode(DistributionMode.NONE)
- .uidPrefix("leftIcebergSink")
- .append();
-
- List<Row> rightRows = createRows("right-");
- DataStream<Row> rightStream =
- env.fromCollection(rightRows, ROW_TYPE_INFO)
- .name("rightCustomSource")
- .uid("rightCustomSource");
- FlinkSink.forRow(rightStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(rightTable)
- .tableLoader(rightTableLoader)
- .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
- .writeParallelism(parallelism)
- .distributionMode(DistributionMode.HASH)
- .uidPrefix("rightIcebergSink")
- .setSnapshotProperty("flink.test",
TestFlinkIcebergSink.class.getName())
- .setSnapshotProperties(Collections.singletonMap("direction",
"rightTable"))
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
-
- SimpleDataUtil.assertTableRows(leftTable, convertToRowData(leftRows));
- SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));
-
- leftTable.refresh();
-
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test",
"direction");
- rightTable.refresh();
- assertThat(rightTable.currentSnapshot().summary())
- .containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
- .containsEntry("direction", "rightTable");
- }
-
- @TestTemplate
- public void testOverrideWriteConfigWithUnknownDistributionMode() {
- Map<String, String> newProps = Maps.newHashMap();
- newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
-
- List<Row> rows = createRows("");
- DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
-
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .writeParallelism(parallelism)
- .setAll(newProps);
-
- assertThatThrownBy(builder::append)
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Invalid distribution mode: UNRECOGNIZED");
- }
-
- @TestTemplate
- public void testOverrideWriteConfigWithUnknownFileFormat() {
- Map<String, String> newProps = Maps.newHashMap();
- newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
-
- List<Row> rows = createRows("");
- DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
-
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .writeParallelism(parallelism)
- .setAll(newProps);
-
- assertThatThrownBy(builder::append)
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Invalid file format: UNRECOGNIZED");
- }
-
- @TestTemplate
- public void testWriteRowWithTableRefreshInterval() throws Exception {
- List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
- DataStream<RowData> dataStream =
- env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
- .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
-
- Configuration flinkConf = new Configuration();
- flinkConf.setString(FlinkWriteOptions.TABLE_REFRESH_INTERVAL.key(),
"100ms");
-
- FlinkSink.forRowData(dataStream)
- .table(table)
- .tableLoader(tableLoader)
- .flinkConf(flinkConf)
- .writeParallelism(parallelism)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream");
-
- // Assert the iceberg table's records.
- SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
+ testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA,
DistributionMode.NONE);
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
index b38aa6b50c..9ce36cc1e8 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -18,29 +18,52 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+
+import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.extension.RegisterExtension;
public class TestFlinkIcebergSinkBase {
- protected Table table;
- protected StreamExecutionEnvironment env;
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ @RegisterExtension
+ protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
protected static final TypeInformation<Row> ROW_TYPE_INFO =
new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
protected static final DataFormatConverters.RowConverter CONVERTER =
new
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+ protected TableLoader tableLoader;
+ protected Table table;
+ protected StreamExecutionEnvironment env;
+
protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
return new BoundedTestSource<>(rows.toArray(new Row[0]));
}
@@ -61,4 +84,28 @@ public class TestFlinkIcebergSinkBase {
protected List<RowData> convertToRowData(List<Row> rows) {
return
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
}
+
+ protected void testWriteRow(
+ int writerParallelism, TableSchema tableSchema, DistributionMode
distributionMode)
+ throws Exception {
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
+
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .writeParallelism(writerParallelism)
+ .distributionMode(distributionMode)
+ .append();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+
+ SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
+ }
+
+ protected int partitionFiles(String partition) throws IOException {
+ return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
partition)).size();
+ }
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
new file mode 100644
index 0000000000..75e397d3f2
--- /dev/null
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * This tests the distribution mode of Flink sink. Extract them separately
since it is unnecessary
+ * to test different file formats (Avro, Orc, Parquet) like in {@link
TestFlinkIcebergSink}.
+ * Removing the file format dimension reduces the number of combinations from
12 to 4, which helps
+ * reduce test run time.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkDistributionMode extends
TestFlinkIcebergSinkBase {
+
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ @RegisterExtension
+ private static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+ private final FileFormat format = FileFormat.PARQUET;
+
+ @Parameter(index = 0)
+ private int parallelism;
+
+ @Parameter(index = 1)
+ private boolean partitioned;
+
+ @Parameters(name = "parallelism = {0}, partitioned = {1}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {1, true},
+ {1, false},
+ {2, true},
+ {2, false}
+ };
+ }
+
+ @BeforeEach
+ public void before() throws IOException {
+ this.table =
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SimpleDataUtil.SCHEMA,
+ partitioned
+ ?
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
+ : PartitionSpec.unpartitioned(),
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
format.name()));
+
+ this.env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .enableCheckpointing(100)
+ .setParallelism(parallelism)
+ .setMaxParallelism(parallelism);
+
+ this.tableLoader = CATALOG_EXTENSION.tableLoader();
+ }
+
+ @TestTemplate
+ public void testShuffleByPartitionWithSchema() throws Exception {
+ testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA,
DistributionMode.HASH);
+ if (partitioned) {
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
+ }
+ }
+
+ @TestTemplate
+ public void testJobNoneDistributeMode() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
+ .commit();
+
+ testWriteRow(parallelism, null, DistributionMode.NONE);
+
+ if (parallelism > 1) {
+ if (partitioned) {
+ int files = partitionFiles("aaa") + partitionFiles("bbb") +
partitionFiles("ccc");
+ assertThat(files).isGreaterThan(3);
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testJobNullDistributionMode() throws Exception {
+ table
+ .updateProperties()
+ .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
+ .commit();
+
+ testWriteRow(parallelism, null, null);
+
+ if (partitioned) {
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
+ }
+ }
+
+ @TestTemplate
+ public void testPartitionWriteMode() throws Exception {
+ testWriteRow(parallelism, null, DistributionMode.HASH);
+ if (partitioned) {
+ assertThat(partitionFiles("aaa")).isEqualTo(1);
+ assertThat(partitionFiles("bbb")).isEqualTo(1);
+ assertThat(partitionFiles("ccc")).isEqualTo(1);
+ }
+ }
+
+ @TestTemplate
+ public void testOverrideWriteConfigWithUnknownDistributionMode() {
+ Map<String, String> newProps = Maps.newHashMap();
+ newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
+
+ List<Row> rows = createRows("");
+ DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
+
+ FlinkSink.Builder builder =
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .writeParallelism(parallelism)
+ .setAll(newProps);
+
+ assertThatThrownBy(builder::append)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid distribution mode: UNRECOGNIZED");
+ }
+}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
similarity index 54%
copy from
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
copy to
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
index 61ab087f2c..36a59b2043 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;
-import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -29,21 +28,15 @@ import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Parameter;
-import org.apache.iceberg.ParameterizedTestExtension;
-import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TableLoader;
@@ -53,53 +46,21 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.Test;
-@ExtendWith(ParameterizedTestExtension.class)
-public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
-
- @RegisterExtension
- public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
- MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
- @RegisterExtension
- private static final HadoopCatalogExtension CATALOG_EXTENSION =
- new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
-
- private TableLoader tableLoader;
-
- @Parameter(index = 0)
- private FileFormat format;
-
- @Parameter(index = 1)
- private int parallelism;
-
- @Parameter(index = 2)
- private boolean partitioned;
-
- @Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
- public static Object[][] parameters() {
- return new Object[][] {
- {FileFormat.AVRO, 1, true},
- {FileFormat.AVRO, 1, false},
- {FileFormat.AVRO, 2, true},
- {FileFormat.AVRO, 2, false},
- {FileFormat.ORC, 1, true},
- {FileFormat.ORC, 1, false},
- {FileFormat.ORC, 2, true},
- {FileFormat.ORC, 2, false},
- {FileFormat.PARQUET, 1, true},
- {FileFormat.PARQUET, 1, false},
- {FileFormat.PARQUET, 2, true},
- {FileFormat.PARQUET, 2, false}
- };
- }
+/**
+ * This class tests the more extended features of Flink sink. Extract them
separately since it is
+ * unnecessary to test all the parameters combinations in {@link
TestFlinkIcebergSink}. Each test
+ * method in {@link TestFlinkIcebergSink} runs 12 combinations, which are
expensive and slow.
+ */
+public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase {
+ private final boolean partitioned = true;
+ private final int parallelism = 2;
+ private final FileFormat format = FileFormat.PARQUET;
@BeforeEach
public void before() throws IOException {
- table =
+ this.table =
CATALOG_EXTENSION
.catalog()
.createTable(
@@ -110,135 +71,17 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
: PartitionSpec.unpartitioned(),
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
format.name()));
- env =
+ this.env =
StreamExecutionEnvironment.getExecutionEnvironment(
MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
.enableCheckpointing(100)
.setParallelism(parallelism)
.setMaxParallelism(parallelism);
- tableLoader = CATALOG_EXTENSION.tableLoader();
- }
-
- @TestTemplate
- public void testWriteRowData() throws Exception {
- List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
- DataStream<RowData> dataStream =
- env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
- .map(CONVERTER::toInternal,
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
-
- FlinkSink.forRowData(dataStream)
- .table(table)
- .tableLoader(tableLoader)
- .writeParallelism(parallelism)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream");
-
- // Assert the iceberg table's records.
- SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
- }
-
- private void testWriteRow(TableSchema tableSchema, DistributionMode
distributionMode)
- throws Exception {
- List<Row> rows = createRows("");
- DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
-
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .writeParallelism(parallelism)
- .distributionMode(distributionMode)
- .append();
-
- // Execute the program.
- env.execute("Test Iceberg DataStream.");
-
- SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
- }
-
- private int partitionFiles(String partition) throws IOException {
- return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data",
partition)).size();
- }
-
- @TestTemplate
- public void testWriteRow() throws Exception {
- testWriteRow(null, DistributionMode.NONE);
- }
-
- @TestTemplate
- public void testWriteRowWithTableSchema() throws Exception {
- testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
- }
-
- @TestTemplate
- public void testJobNoneDistributeMode() throws Exception {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- testWriteRow(null, DistributionMode.NONE);
-
- if (parallelism > 1) {
- if (partitioned) {
- int files = partitionFiles("aaa") + partitionFiles("bbb") +
partitionFiles("ccc");
- assertThat(files).isGreaterThan(3);
- }
- }
- }
-
- @TestTemplate
- public void testJobHashDistributionMode() {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Flink does not support 'range' write distribution mode
now.");
- }
-
- @TestTemplate
- public void testJobNullDistributionMode() throws Exception {
- table
- .updateProperties()
- .set(TableProperties.WRITE_DISTRIBUTION_MODE,
DistributionMode.HASH.modeName())
- .commit();
-
- testWriteRow(null, null);
-
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
+ this.tableLoader = CATALOG_EXTENSION.tableLoader();
}
- @TestTemplate
- public void testPartitionWriteMode() throws Exception {
- testWriteRow(null, DistributionMode.HASH);
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
- }
-
- @TestTemplate
- public void testShuffleByPartitionWithSchema() throws Exception {
- testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
- if (partitioned) {
- assertThat(partitionFiles("aaa")).isEqualTo(1);
- assertThat(partitionFiles("bbb")).isEqualTo(1);
- assertThat(partitionFiles("ccc")).isEqualTo(1);
- }
- }
-
- @TestTemplate
+ @Test
public void testTwoSinksInDisjointedDAG() throws Exception {
Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
@@ -319,27 +162,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.containsEntry("direction", "rightTable");
}
- @TestTemplate
- public void testOverrideWriteConfigWithUnknownDistributionMode() {
- Map<String, String> newProps = Maps.newHashMap();
- newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
-
- List<Row> rows = createRows("");
- DataStream<Row> dataStream = env.addSource(createBoundedSource(rows),
ROW_TYPE_INFO);
-
- FlinkSink.Builder builder =
- FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
- .table(table)
- .tableLoader(tableLoader)
- .writeParallelism(parallelism)
- .setAll(newProps);
-
- assertThatThrownBy(builder::append)
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Invalid distribution mode: UNRECOGNIZED");
- }
-
- @TestTemplate
+ @Test
public void testOverrideWriteConfigWithUnknownFileFormat() {
Map<String, String> newProps = Maps.newHashMap();
newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
@@ -359,7 +182,7 @@ public class TestFlinkIcebergSink extends
TestFlinkIcebergSinkBase {
.hasMessage("Invalid file format: UNRECOGNIZED");
}
- @TestTemplate
+ @Test
public void testWriteRowWithTableRefreshInterval() throws Exception {
List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2,
"world"), Row.of(3, "foo"));
DataStream<RowData> dataStream =