This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new dc7ad71909 Flink: refactor sink tests to reduce the number of 
combinations with parameterized tests (#10777)
dc7ad71909 is described below

commit dc7ad7190989d50f3288ea02eb26d527c9f629c6
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Fri Aug 2 08:39:10 2024 -0700

    Flink: refactor sink tests to reduce the number of combinations with 
parameterized tests (#10777)
---
 .../org/apache/iceberg/flink/CatalogTestBase.java  |  22 --
 .../iceberg/flink/{TestBase.java => SqlBase.java}  |  85 ++-----
 .../java/org/apache/iceberg/flink/TestBase.java    |   4 +-
 .../apache/iceberg/flink/TestFlinkTableSink.java   | 114 ---------
 .../iceberg/flink/TestFlinkTableSinkExtended.java  | 244 +++++++++++++++++++
 .../apache/iceberg/flink/TestIcebergConnector.java |   4 -
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   | 270 +--------------------
 .../flink/sink/TestFlinkIcebergSinkBase.java       |  51 +++-
 .../sink/TestFlinkIcebergSinkDistributionMode.java | 180 ++++++++++++++
 ...Sink.java => TestFlinkIcebergSinkExtended.java} | 209 ++--------------
 10 files changed, 521 insertions(+), 662 deletions(-)

diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
index 91ed3c4ade..062ff68d5d 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/CatalogTestBase.java
@@ -118,26 +118,4 @@ public abstract class CatalogTestBase extends TestBase {
   static String getURI(HiveConf conf) {
     return conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
   }
-
-  static String toWithClause(Map<String, String> props) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("(");
-    int propCount = 0;
-    for (Map.Entry<String, String> entry : props.entrySet()) {
-      if (propCount > 0) {
-        builder.append(",");
-      }
-      builder
-          .append("'")
-          .append(entry.getKey())
-          .append("'")
-          .append("=")
-          .append("'")
-          .append(entry.getValue())
-          .append("'");
-      propCount++;
-    }
-    builder.append(")");
-    return builder.toString();
-  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
similarity index 63%
copy from flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
copy to flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
index 6336900446..9411ea4f7d 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SqlBase.java
@@ -21,73 +21,16 @@ package org.apache.iceberg.flink;
 import static 
org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.nio.file.Path;
 import java.util.List;
-import org.apache.flink.table.api.EnvironmentSettings;
+import java.util.Map;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.iceberg.CatalogUtil;
-import org.apache.iceberg.hive.HiveCatalog;
-import org.apache.iceberg.hive.TestHiveMetastore;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
 
-public abstract class TestBase extends TestBaseUtils {
-
-  @RegisterExtension
-  public static MiniClusterExtension miniClusterExtension =
-      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
-  @TempDir protected Path temporaryDirectory;
-
-  private static TestHiveMetastore metastore = null;
-  protected static HiveConf hiveConf = null;
-  protected static HiveCatalog catalog = null;
-
-  private volatile TableEnvironment tEnv = null;
-
-  @BeforeAll
-  public static void startMetastore() {
-    TestBase.metastore = new TestHiveMetastore();
-    metastore.start();
-    TestBase.hiveConf = metastore.hiveConf();
-    TestBase.catalog =
-        (HiveCatalog)
-            CatalogUtil.loadCatalog(
-                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
-  }
-
-  @AfterAll
-  public static void stopMetastore() throws Exception {
-    metastore.stop();
-    TestBase.catalog = null;
-  }
-
-  protected TableEnvironment getTableEnv() {
-    if (tEnv == null) {
-      synchronized (this) {
-        if (tEnv == null) {
-          EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
-
-          TableEnvironment env = TableEnvironment.create(settings);
-          env.getConfig()
-              .getConfiguration()
-              
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
-          tEnv = env;
-        }
-      }
-    }
-    return tEnv;
-  }
+public abstract class SqlBase {
+  protected abstract TableEnvironment getTableEnv();
 
   protected static TableResult exec(TableEnvironment env, String query, 
Object... args) {
     return env.executeSql(String.format(query, args));
@@ -142,4 +85,26 @@ public abstract class TestBase extends TestBaseUtils {
     sql("USE CATALOG %s", currentCatalog);
     sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
   }
+
+  protected static String toWithClause(Map<String, String> props) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    int propCount = 0;
+    for (Map.Entry<String, String> entry : props.entrySet()) {
+      if (propCount > 0) {
+        builder.append(",");
+      }
+      builder
+          .append("'")
+          .append(entry.getKey())
+          .append("'")
+          .append("=")
+          .append("'")
+          .append(entry.getValue())
+          .append("'");
+      propCount++;
+    }
+    builder.append(")");
+    return builder.toString();
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
index 6336900446..401960c359 100644
--- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
+++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -41,7 +40,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
-public abstract class TestBase extends TestBaseUtils {
+public abstract class TestBase extends SqlBase {
 
   @RegisterExtension
   public static MiniClusterExtension miniClusterExtension =
@@ -72,6 +71,7 @@ public abstract class TestBase extends TestBaseUtils {
     TestBase.catalog = null;
   }
 
+  @Override
   protected TableEnvironment getTableEnv() {
     if (tEnv == null) {
       synchronized (this) {
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
index a0341e6834..2978a92945 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
@@ -18,36 +18,21 @@
  */
 package org.apache.iceberg.flink;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.types.Row;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.source.BoundedTableFactory;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -169,39 +154,6 @@ public class TestFlinkTableSink extends CatalogTestBase {
         icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b")));
   }
 
-  @TestTemplate
-  public void testWriteParallelism() throws Exception {
-    List<Row> dataSet =
-        IntStream.range(1, 1000)
-            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
-            .flatMap(List::stream)
-            .collect(Collectors.toList());
-    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
-    sql(
-        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
-            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
-        SOURCE_TABLE, dataId);
-
-    PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
getTableEnv()).getPlanner();
-    String insertSQL =
-        String.format(
-            "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * 
FROM %s",
-            TABLE_NAME, SOURCE_TABLE);
-    ModifyOperation operation = (ModifyOperation) 
planner.getParser().parse(insertSQL).get(0);
-    Transformation<?> dummySink = 
planner.translate(Collections.singletonList(operation)).get(0);
-    Transformation<?> committer = dummySink.getInputs().get(0);
-    Transformation<?> writer = committer.getInputs().get(0);
-
-    assertThat(writer.getParallelism()).as("Should have the expected 1 
parallelism.").isEqualTo(1);
-    writer
-        .getInputs()
-        .forEach(
-            input ->
-                assertThat(input.getParallelism())
-                    .as("Should have the expected parallelism.")
-                    .isEqualTo(isStreamingJob ? 2 : 4));
-  }
-
   @TestTemplate
   public void testReplacePartitions() throws Exception {
     assumeThat(isStreamingJob)
@@ -289,70 +241,4 @@ public class TestFlinkTableSink extends CatalogTestBase {
       sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
     }
   }
-
-  @TestTemplate
-  public void testHashDistributeMode() throws Exception {
-    String tableName = "test_hash_distribution_mode";
-    Map<String, String> tableProps =
-        ImmutableMap.of(
-            "write.format.default",
-            format.name(),
-            TableProperties.WRITE_DISTRIBUTION_MODE,
-            DistributionMode.HASH.modeName());
-
-    // Initialize a BoundedSource table to precisely emit those rows in only 
one checkpoint.
-    List<Row> dataSet =
-        IntStream.range(1, 1000)
-            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
-            .flatMap(List::stream)
-            .collect(Collectors.toList());
-    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
-    sql(
-        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
-            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
-        SOURCE_TABLE, dataId);
-
-    assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
-        .as("Should have the expected rows in source table.")
-        .containsExactlyInAnyOrderElementsOf(dataSet);
-
-    sql(
-        "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
-        tableName, toWithClause(tableProps));
-
-    try {
-      // Insert data set.
-      sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
-
-      assertThat(sql("SELECT * FROM %s", tableName))
-          .as("Should have the expected rows in sink table.")
-          .containsExactlyInAnyOrderElementsOf(dataSet);
-
-      // Sometimes we will have more than one checkpoint if we pass the auto 
checkpoint interval,
-      // thus producing multiple snapshots.  Here we assert that each snapshot 
has only 1 file per
-      // partition.
-      Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-      Map<Long, List<DataFile>> snapshotToDataFiles = 
SimpleDataUtil.snapshotToDataFiles(table);
-      for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
-        if (dataFiles.isEmpty()) {
-          continue;
-        }
-
-        assertThat(
-                SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
-            .hasSize(1);
-        assertThat(
-                SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
-            .hasSize(1);
-        assertThat(
-                SimpleDataUtil.matchingPartitions(
-                    dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
-            .hasSize(1);
-      }
-    } finally {
-      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
-    }
-  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
new file mode 100644
index 0000000000..482cfd110b
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static 
org.apache.iceberg.flink.FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.flink.source.BoundedTableFactory;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * This class tests the more extended features of Flink sink. Extract them 
separately since it is
+ * unnecessary to test all the parameters combinations in {@link 
TestFlinkTableSink}, like catalog
+ * types, namespaces, file format, streaming/batch. Those combinations explode 
exponentially. Each
+ * test method in {@link TestFlinkTableSink} runs 21 combinations, which are 
expensive and slow.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkTableSinkExtended extends SqlBase {
+  protected static final String CATALOG = "testhadoop";
+  protected static final String DATABASE = "db";
+  protected static final String TABLE = "tbl";
+
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+  private static final String SOURCE_TABLE = 
"default_catalog.default_database.bounded_source";
+  private static final String FLINK_DATABASE = CATALOG + "." + DATABASE;
+  private static final Namespace ICEBERG_NAMESPACE = Namespace.of(new String[] 
{DATABASE});
+
+  @TempDir protected File warehouseRoot;
+
+  protected HadoopCatalog catalog = null;
+
+  private TableEnvironment tEnv;
+
+  @Parameter protected boolean isStreamingJob;
+
+  @Parameters(name = "isStreamingJob={0}")
+  protected static List<Object[]> parameters() {
+    return Arrays.asList(new Boolean[] {true}, new Boolean[] {false});
+  }
+
+  protected synchronized TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      EnvironmentSettings.Builder settingsBuilder = 
EnvironmentSettings.newInstance();
+      if (isStreamingJob) {
+        settingsBuilder.inStreamingMode();
+        StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG);
+        env.enableCheckpointing(400);
+        env.setMaxParallelism(2);
+        env.setParallelism(2);
+        tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+      } else {
+        settingsBuilder.inBatchMode();
+        tEnv = TableEnvironment.create(settingsBuilder.build());
+      }
+    }
+    return tEnv;
+  }
+
+  @BeforeEach
+  public void before() {
+    String warehouseLocation = "file:" + warehouseRoot.getPath();
+    this.catalog = new HadoopCatalog(new Configuration(), warehouseLocation);
+    Map<String, String> config = Maps.newHashMap();
+    config.put("type", "iceberg");
+    config.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, 
ICEBERG_CATALOG_TYPE_HADOOP);
+    config.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
+    sql("CREATE CATALOG %s WITH %s", CATALOG, toWithClause(config));
+
+    sql("CREATE DATABASE %s", FLINK_DATABASE);
+    sql("USE CATALOG %s", CATALOG);
+    sql("USE %s", DATABASE);
+    sql(
+        "CREATE TABLE %s (id int, data varchar) with 
('write.format.default'='%s')",
+        TABLE, FileFormat.PARQUET.name());
+  }
+
+  @AfterEach
+  public void clean() throws Exception {
+    sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, TABLE);
+    dropDatabase(FLINK_DATABASE, true);
+    BoundedTableFactory.clearDataSets();
+
+    dropCatalog(CATALOG, true);
+    catalog.close();
+  }
+
+  @TestTemplate
+  public void testWriteParallelism() {
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) 
getTableEnv()).getPlanner();
+    String insertSQL =
+        String.format(
+            "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * 
FROM %s",
+            TABLE, SOURCE_TABLE);
+    ModifyOperation operation = (ModifyOperation) 
planner.getParser().parse(insertSQL).get(0);
+    Transformation<?> dummySink = 
planner.translate(Collections.singletonList(operation)).get(0);
+    Transformation<?> committer = dummySink.getInputs().get(0);
+    Transformation<?> writer = committer.getInputs().get(0);
+
+    assertThat(writer.getParallelism()).as("Should have the expected 1 
parallelism.").isEqualTo(1);
+    writer
+        .getInputs()
+        .forEach(
+            input ->
+                assertThat(input.getParallelism())
+                    .as("Should have the expected parallelism.")
+                    .isEqualTo(isStreamingJob ? 2 : 4));
+  }
+
+  @TestTemplate
+  public void testHashDistributeMode() throws Exception {
+    // Initialize a BoundedSource table to precisely emit those rows in only 
one checkpoint.
+    List<Row> dataSet =
+        IntStream.range(1, 1000)
+            .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, 
"bbb"), Row.of(i, "ccc")))
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    String dataId = 
BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet));
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+            + " WITH ('connector'='BoundedSource', 'data-id'='%s')",
+        SOURCE_TABLE, dataId);
+
+    assertThat(sql("SELECT * FROM %s", SOURCE_TABLE))
+        .as("Should have the expected rows in source table.")
+        .containsExactlyInAnyOrderElementsOf(dataSet);
+
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            "write.format.default",
+            FileFormat.PARQUET.name(),
+            TableProperties.WRITE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+
+    String tableName = "test_hash_distribution_mode";
+    sql(
+        "CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
+        tableName, toWithClause(tableProps));
+
+    try {
+      // Insert data set.
+      sql("INSERT INTO %s SELECT * FROM %s", tableName, SOURCE_TABLE);
+
+      assertThat(sql("SELECT * FROM %s", tableName))
+          .as("Should have the expected rows in sink table.")
+          .containsExactlyInAnyOrderElementsOf(dataSet);
+
+      // Sometimes we will have more than one checkpoint if we pass the auto 
checkpoint interval,
+      // thus producing multiple snapshots.  Here we assert that each snapshot 
has only 1 file per
+      // partition.
+      Table table = catalog.loadTable(TableIdentifier.of(ICEBERG_NAMESPACE, 
tableName));
+      Map<Long, List<DataFile>> snapshotToDataFiles = 
SimpleDataUtil.snapshotToDataFiles(table);
+      for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
+        if (dataFiles.isEmpty()) {
+          continue;
+        }
+
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "aaa")))
+            .hasSize(1);
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "bbb")))
+            .hasSize(1);
+        assertThat(
+                SimpleDataUtil.matchingPartitions(
+                    dataFiles, table.spec(), ImmutableMap.of("data", "ccc")))
+            .hasSize(1);
+      }
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", FLINK_DATABASE, tableName);
+    }
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
index fdb0e0cf19..47f5485df8 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java
@@ -319,10 +319,6 @@ public class TestIcebergConnector extends TestBase {
     return properties.getOrDefault("catalog-database", "default_database");
   }
 
-  private String toWithClause(Map<String, String> props) {
-    return CatalogTestBase.toWithClause(props);
-  }
-
   private String createWarehouse() {
     try {
       return String.format(
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index 61ab087f2c..b778037c55 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -18,20 +18,11 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.flink.TestFixtures.DATABASE;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
@@ -39,37 +30,19 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
 
 @ExtendWith(ParameterizedTestExtension.class)
 public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
-
-  @RegisterExtension
-  public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
-      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
-  @RegisterExtension
-  private static final HadoopCatalogExtension CATALOG_EXTENSION =
-      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
-
-  private TableLoader tableLoader;
-
   @Parameter(index = 0)
   private FileFormat format;
 
@@ -99,7 +72,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
 
   @BeforeEach
   public void before() throws IOException {
-    table =
+    this.table =
         CATALOG_EXTENSION
             .catalog()
             .createTable(
@@ -110,14 +83,14 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
                     : PartitionSpec.unpartitioned(),
                 ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
 
-    env =
+    this.env =
         StreamExecutionEnvironment.getExecutionEnvironment(
                 MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
             .enableCheckpointing(100)
             .setParallelism(parallelism)
             .setMaxParallelism(parallelism);
 
-    tableLoader = CATALOG_EXTENSION.tableLoader();
+    this.tableLoader = CATALOG_EXTENSION.tableLoader();
   }
 
   @TestTemplate
@@ -140,246 +113,13 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
     SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
   }
 
-  private void testWriteRow(TableSchema tableSchema, DistributionMode 
distributionMode)
-      throws Exception {
-    List<Row> rows = createRows("");
-    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .table(table)
-        .tableLoader(tableLoader)
-        .tableSchema(tableSchema)
-        .writeParallelism(parallelism)
-        .distributionMode(distributionMode)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg DataStream.");
-
-    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
-  }
-
-  private int partitionFiles(String partition) throws IOException {
-    return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
partition)).size();
-  }
-
   @TestTemplate
   public void testWriteRow() throws Exception {
-    testWriteRow(null, DistributionMode.NONE);
+    testWriteRow(parallelism, null, DistributionMode.NONE);
   }
 
   @TestTemplate
   public void testWriteRowWithTableSchema() throws Exception {
-    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
-  }
-
-  @TestTemplate
-  public void testJobNoneDistributeMode() throws Exception {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    testWriteRow(null, DistributionMode.NONE);
-
-    if (parallelism > 1) {
-      if (partitioned) {
-        int files = partitionFiles("aaa") + partitionFiles("bbb") + 
partitionFiles("ccc");
-        assertThat(files).isGreaterThan(3);
-      }
-    }
-  }
-
-  @TestTemplate
-  public void testJobHashDistributionMode() {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Flink does not support 'range' write distribution mode 
now.");
-  }
-
-  @TestTemplate
-  public void testJobNullDistributionMode() throws Exception {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    testWriteRow(null, null);
-
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
-  }
-
-  @TestTemplate
-  public void testPartitionWriteMode() throws Exception {
-    testWriteRow(null, DistributionMode.HASH);
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
-  }
-
-  @TestTemplate
-  public void testShuffleByPartitionWithSchema() throws Exception {
-    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
-  }
-
-  @TestTemplate
-  public void testTwoSinksInDisjointedDAG() throws Exception {
-    Map<String, String> props = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
-
-    Table leftTable =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(
-                TableIdentifier.of("left"),
-                SimpleDataUtil.SCHEMA,
-                partitioned
-                    ? 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
-                    : PartitionSpec.unpartitioned(),
-                props);
-    TableLoader leftTableLoader =
-        TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), 
TableIdentifier.of("left"));
-
-    Table rightTable =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(
-                TableIdentifier.of("right"),
-                SimpleDataUtil.SCHEMA,
-                partitioned
-                    ? 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
-                    : PartitionSpec.unpartitioned(),
-                props);
-    TableLoader rightTableLoader =
-        TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), 
TableIdentifier.of("right"));
-
-    env =
-        StreamExecutionEnvironment.getExecutionEnvironment(
-                MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
-            .enableCheckpointing(100)
-            .setParallelism(parallelism)
-            .setMaxParallelism(parallelism);
-    env.getConfig().disableAutoGeneratedUIDs();
-
-    List<Row> leftRows = createRows("left-");
-    DataStream<Row> leftStream =
-        env.fromCollection(leftRows, ROW_TYPE_INFO)
-            .name("leftCustomSource")
-            .uid("leftCustomSource");
-    FlinkSink.forRow(leftStream, SimpleDataUtil.FLINK_SCHEMA)
-        .table(leftTable)
-        .tableLoader(leftTableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .distributionMode(DistributionMode.NONE)
-        .uidPrefix("leftIcebergSink")
-        .append();
-
-    List<Row> rightRows = createRows("right-");
-    DataStream<Row> rightStream =
-        env.fromCollection(rightRows, ROW_TYPE_INFO)
-            .name("rightCustomSource")
-            .uid("rightCustomSource");
-    FlinkSink.forRow(rightStream, SimpleDataUtil.FLINK_SCHEMA)
-        .table(rightTable)
-        .tableLoader(rightTableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .writeParallelism(parallelism)
-        .distributionMode(DistributionMode.HASH)
-        .uidPrefix("rightIcebergSink")
-        .setSnapshotProperty("flink.test", 
TestFlinkIcebergSink.class.getName())
-        .setSnapshotProperties(Collections.singletonMap("direction", 
"rightTable"))
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg DataStream.");
-
-    SimpleDataUtil.assertTableRows(leftTable, convertToRowData(leftRows));
-    SimpleDataUtil.assertTableRows(rightTable, convertToRowData(rightRows));
-
-    leftTable.refresh();
-    
assertThat(leftTable.currentSnapshot().summary()).doesNotContainKeys("flink.test",
 "direction");
-    rightTable.refresh();
-    assertThat(rightTable.currentSnapshot().summary())
-        .containsEntry("flink.test", TestFlinkIcebergSink.class.getName())
-        .containsEntry("direction", "rightTable");
-  }
-
-  @TestTemplate
-  public void testOverrideWriteConfigWithUnknownDistributionMode() {
-    Map<String, String> newProps = Maps.newHashMap();
-    newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
-
-    List<Row> rows = createRows("");
-    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
-
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-            .table(table)
-            .tableLoader(tableLoader)
-            .writeParallelism(parallelism)
-            .setAll(newProps);
-
-    assertThatThrownBy(builder::append)
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Invalid distribution mode: UNRECOGNIZED");
-  }
-
-  @TestTemplate
-  public void testOverrideWriteConfigWithUnknownFileFormat() {
-    Map<String, String> newProps = Maps.newHashMap();
-    newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
-
-    List<Row> rows = createRows("");
-    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
-
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-            .table(table)
-            .tableLoader(tableLoader)
-            .writeParallelism(parallelism)
-            .setAll(newProps);
-
-    assertThatThrownBy(builder::append)
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Invalid file format: UNRECOGNIZED");
-  }
-
-  @TestTemplate
-  public void testWriteRowWithTableRefreshInterval() throws Exception {
-    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
-    DataStream<RowData> dataStream =
-        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
-            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
-
-    Configuration flinkConf = new Configuration();
-    flinkConf.setString(FlinkWriteOptions.TABLE_REFRESH_INTERVAL.key(), 
"100ms");
-
-    FlinkSink.forRowData(dataStream)
-        .table(table)
-        .tableLoader(tableLoader)
-        .flinkConf(flinkConf)
-        .writeParallelism(parallelism)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg DataStream");
-
-    // Assert the iceberg table's records.
-    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
+    testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, 
DistributionMode.NONE);
   }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
index b38aa6b50c..9ce36cc1e8 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -18,29 +18,52 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 public class TestFlinkIcebergSinkBase {
 
-  protected Table table;
-  protected StreamExecutionEnvironment env;
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+  @RegisterExtension
+  protected static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
   protected static final TypeInformation<Row> ROW_TYPE_INFO =
       new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
 
   protected static final DataFormatConverters.RowConverter CONVERTER =
       new 
DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
 
+  protected TableLoader tableLoader;
+  protected Table table;
+  protected StreamExecutionEnvironment env;
+
   protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
     return new BoundedTestSource<>(rows.toArray(new Row[0]));
   }
@@ -61,4 +84,28 @@ public class TestFlinkIcebergSinkBase {
   protected List<RowData> convertToRowData(List<Row> rows) {
     return 
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
   }
+
+  protected void testWriteRow(
+      int writerParallelism, TableSchema tableSchema, DistributionMode 
distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .writeParallelism(writerParallelism)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
+  }
+
+  protected int partitionFiles(String partition) throws IOException {
+    return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
partition)).size();
+  }
 }
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
new file mode 100644
index 0000000000..75e397d3f2
--- /dev/null
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.MiniFlinkClusterExtension;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * This tests the distribution mode of Flink sink. Extract them separately 
since it is unnecessary
+ * to test different file formats (Avro, Orc, Parquet) like in {@link 
TestFlinkIcebergSink}.
+ * Removing the file format dimension reduces the number of combinations from 
12 to 4, which helps
+ * reduce test run time.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestFlinkIcebergSinkDistributionMode extends 
TestFlinkIcebergSinkBase {
+
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+  @RegisterExtension
+  private static final HadoopCatalogExtension CATALOG_EXTENSION =
+      new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private final FileFormat format = FileFormat.PARQUET;
+
+  @Parameter(index = 0)
+  private int parallelism;
+
+  @Parameter(index = 1)
+  private boolean partitioned;
+
+  @Parameters(name = "parallelism = {0}, partitioned = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, true},
+      {1, false},
+      {2, true},
+      {2, false}
+    };
+  }
+
+  @BeforeEach
+  public void before() throws IOException {
+    this.table =
+        CATALOG_EXTENSION
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                partitioned
+                    ? 
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
+                    : PartitionSpec.unpartitioned(),
+                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
+
+    this.env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100)
+            .setParallelism(parallelism)
+            .setMaxParallelism(parallelism);
+
+    this.tableLoader = CATALOG_EXTENSION.tableLoader();
+  }
+
+  @TestTemplate
+  public void testShuffleByPartitionWithSchema() throws Exception {
+    testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, 
DistributionMode.HASH);
+    if (partitioned) {
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
+    }
+  }
+
+  @TestTemplate
+  public void testJobNoneDistributeMode() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
+        .commit();
+
+    testWriteRow(parallelism, null, DistributionMode.NONE);
+
+    if (parallelism > 1) {
+      if (partitioned) {
+        int files = partitionFiles("aaa") + partitionFiles("bbb") + 
partitionFiles("ccc");
+        assertThat(files).isGreaterThan(3);
+      }
+    }
+  }
+
+  @TestTemplate
+  public void testJobNullDistributionMode() throws Exception {
+    table
+        .updateProperties()
+        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
+        .commit();
+
+    testWriteRow(parallelism, null, null);
+
+    if (partitioned) {
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
+    }
+  }
+
+  @TestTemplate
+  public void testPartitionWriteMode() throws Exception {
+    testWriteRow(parallelism, null, DistributionMode.HASH);
+    if (partitioned) {
+      assertThat(partitionFiles("aaa")).isEqualTo(1);
+      assertThat(partitionFiles("bbb")).isEqualTo(1);
+      assertThat(partitionFiles("ccc")).isEqualTo(1);
+    }
+  }
+
+  @TestTemplate
+  public void testOverrideWriteConfigWithUnknownDistributionMode() {
+    Map<String, String> newProps = Maps.newHashMap();
+    newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
+
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
+
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+            .table(table)
+            .tableLoader(tableLoader)
+            .writeParallelism(parallelism)
+            .setAll(newProps);
+
+    assertThatThrownBy(builder::append)
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid distribution mode: UNRECOGNIZED");
+  }
+}
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
similarity index 54%
copy from 
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
copy to 
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
index 61ab087f2c..36a59b2043 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkExtended.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import static org.apache.iceberg.flink.TestFixtures.DATABASE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -29,21 +28,15 @@ import java.util.Map;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Parameter;
-import org.apache.iceberg.ParameterizedTestExtension;
-import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.FlinkWriteOptions;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
@@ -53,53 +46,21 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.Test;
 
-@ExtendWith(ParameterizedTestExtension.class)
-public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
-
-  @RegisterExtension
-  public static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
-      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
-
-  @RegisterExtension
-  private static final HadoopCatalogExtension CATALOG_EXTENSION =
-      new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
-
-  private TableLoader tableLoader;
-
-  @Parameter(index = 0)
-  private FileFormat format;
-
-  @Parameter(index = 1)
-  private int parallelism;
-
-  @Parameter(index = 2)
-  private boolean partitioned;
-
-  @Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-      {FileFormat.AVRO, 1, true},
-      {FileFormat.AVRO, 1, false},
-      {FileFormat.AVRO, 2, true},
-      {FileFormat.AVRO, 2, false},
-      {FileFormat.ORC, 1, true},
-      {FileFormat.ORC, 1, false},
-      {FileFormat.ORC, 2, true},
-      {FileFormat.ORC, 2, false},
-      {FileFormat.PARQUET, 1, true},
-      {FileFormat.PARQUET, 1, false},
-      {FileFormat.PARQUET, 2, true},
-      {FileFormat.PARQUET, 2, false}
-    };
-  }
+/**
+ * This class tests the more extended features of Flink sink. Extract them 
separately since it is
+ * unnecessary to test all the parameters combinations in {@link 
TestFlinkIcebergSink}. Each test
+ * method in {@link TestFlinkIcebergSink} runs 12 combinations, which are 
expensive and slow.
+ */
+public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase {
+  private final boolean partitioned = true;
+  private final int parallelism = 2;
+  private final FileFormat format = FileFormat.PARQUET;
 
   @BeforeEach
   public void before() throws IOException {
-    table =
+    this.table =
         CATALOG_EXTENSION
             .catalog()
             .createTable(
@@ -110,135 +71,17 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
                     : PartitionSpec.unpartitioned(),
                 ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, 
format.name()));
 
-    env =
+    this.env =
         StreamExecutionEnvironment.getExecutionEnvironment(
                 MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG)
             .enableCheckpointing(100)
             .setParallelism(parallelism)
             .setMaxParallelism(parallelism);
 
-    tableLoader = CATALOG_EXTENSION.tableLoader();
-  }
-
-  @TestTemplate
-  public void testWriteRowData() throws Exception {
-    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
-    DataStream<RowData> dataStream =
-        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
-            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
-
-    FlinkSink.forRowData(dataStream)
-        .table(table)
-        .tableLoader(tableLoader)
-        .writeParallelism(parallelism)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg DataStream");
-
-    // Assert the iceberg table's records.
-    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
-  }
-
-  private void testWriteRow(TableSchema tableSchema, DistributionMode 
distributionMode)
-      throws Exception {
-    List<Row> rows = createRows("");
-    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .table(table)
-        .tableLoader(tableLoader)
-        .tableSchema(tableSchema)
-        .writeParallelism(parallelism)
-        .distributionMode(distributionMode)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg DataStream.");
-
-    SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
-  }
-
-  private int partitionFiles(String partition) throws IOException {
-    return SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
partition)).size();
-  }
-
-  @TestTemplate
-  public void testWriteRow() throws Exception {
-    testWriteRow(null, DistributionMode.NONE);
-  }
-
-  @TestTemplate
-  public void testWriteRowWithTableSchema() throws Exception {
-    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
-  }
-
-  @TestTemplate
-  public void testJobNoneDistributeMode() throws Exception {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    testWriteRow(null, DistributionMode.NONE);
-
-    if (parallelism > 1) {
-      if (partitioned) {
-        int files = partitionFiles("aaa") + partitionFiles("bbb") + 
partitionFiles("ccc");
-        assertThat(files).isGreaterThan(3);
-      }
-    }
-  }
-
-  @TestTemplate
-  public void testJobHashDistributionMode() {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    assertThatThrownBy(() -> testWriteRow(null, DistributionMode.RANGE))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Flink does not support 'range' write distribution mode 
now.");
-  }
-
-  @TestTemplate
-  public void testJobNullDistributionMode() throws Exception {
-    table
-        .updateProperties()
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName())
-        .commit();
-
-    testWriteRow(null, null);
-
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
+    this.tableLoader = CATALOG_EXTENSION.tableLoader();
   }
 
-  @TestTemplate
-  public void testPartitionWriteMode() throws Exception {
-    testWriteRow(null, DistributionMode.HASH);
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
-  }
-
-  @TestTemplate
-  public void testShuffleByPartitionWithSchema() throws Exception {
-    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.HASH);
-    if (partitioned) {
-      assertThat(partitionFiles("aaa")).isEqualTo(1);
-      assertThat(partitionFiles("bbb")).isEqualTo(1);
-      assertThat(partitionFiles("ccc")).isEqualTo(1);
-    }
-  }
-
-  @TestTemplate
+  @Test
   public void testTwoSinksInDisjointedDAG() throws Exception {
     Map<String, String> props = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
 
@@ -319,27 +162,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
         .containsEntry("direction", "rightTable");
   }
 
-  @TestTemplate
-  public void testOverrideWriteConfigWithUnknownDistributionMode() {
-    Map<String, String> newProps = Maps.newHashMap();
-    newProps.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), "UNRECOGNIZED");
-
-    List<Row> rows = createRows("");
-    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), 
ROW_TYPE_INFO);
-
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-            .table(table)
-            .tableLoader(tableLoader)
-            .writeParallelism(parallelism)
-            .setAll(newProps);
-
-    assertThatThrownBy(builder::append)
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Invalid distribution mode: UNRECOGNIZED");
-  }
-
-  @TestTemplate
+  @Test
   public void testOverrideWriteConfigWithUnknownFileFormat() {
     Map<String, String> newProps = Maps.newHashMap();
     newProps.put(FlinkWriteOptions.WRITE_FORMAT.key(), "UNRECOGNIZED");
@@ -359,7 +182,7 @@ public class TestFlinkIcebergSink extends 
TestFlinkIcebergSinkBase {
         .hasMessage("Invalid file format: UNRECOGNIZED");
   }
 
-  @TestTemplate
+  @Test
   public void testWriteRowWithTableRefreshInterval() throws Exception {
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =

Reply via email to