This is an automated email from the ASF dual-hosted git repository. stevenwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 325d282766a7151b68111191a92ea91fbac02f88 Author: Liwei Li <[email protected]> AuthorDate: Wed Apr 12 10:50:08 2023 +0800 Flink 1.15: Port Expose write-parallelism in SQL Hints to 1.15 --- .../org/apache/iceberg/flink/FlinkWriteConf.java | 4 +++ .../apache/iceberg/flink/FlinkWriteOptions.java | 3 ++ .../org/apache/iceberg/flink/sink/FlinkSink.java | 9 +++-- .../apache/iceberg/flink/TestFlinkTableSink.java | 40 ++++++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java index 4b5c7e4a0d..aba23389f2 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java @@ -180,4 +180,8 @@ public class FlinkWriteConf { .defaultValue(FlinkWriteOptions.BRANCH.defaultValue()) .parse(); } + + public Integer writeParallelism() { + return confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional(); + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java index 86cb2fb0eb..ba0931318e 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java @@ -61,4 +61,7 @@ public class FlinkWriteOptions { // Branch to write to public static final ConfigOption<String> BRANCH = ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH); + + public static final ConfigOption<Integer> WRITE_PARALLELISM = + ConfigOptions.key("write-parallelism").intType().noDefaultValue(); } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 445b6a6ff9..46a229d4c8 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -132,7 +132,6 @@ public class FlinkSink { private TableLoader tableLoader; private Table table; private TableSchema tableSchema; - private Integer writeParallelism = null; private List<String> equalityFieldColumns = null; private String uidPrefix = null; private final Map<String, String> snapshotProperties = Maps.newHashMap(); @@ -248,7 +247,8 @@ public class FlinkSink { * @return {@link Builder} to connect the iceberg table. */ public Builder writeParallelism(int newWriteParallelism) { - this.writeParallelism = newWriteParallelism; + writeOptions.put( + FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism)); return this; } @@ -464,7 +464,10 @@ public class FlinkSink { IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds); - int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; + int parallelism = + flinkWriteConf.writeParallelism() == null + ? input.getParallelism() + : flinkWriteConf.writeParallelism(); SingleOutputStreamOperator<WriteResult> writerStream = input .transform( diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index c4c75edd9e..7540627989 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -18,15 +18,20 @@ */ package org.apache.iceberg.flink; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Expressions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.planner.delegation.PlannerBase; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; @@ -178,6 +183,41 @@ public class TestFlinkTableSink extends FlinkCatalogTestBase { icebergTable, Lists.newArrayList(SimpleDataUtil.createRecord(2, "b"))); } + @Test + public void testWriteParallelism() throws Exception { + List<Row> dataSet = + IntStream.range(1, 1000) + .mapToObj(i -> ImmutableList.of(Row.of(i, "aaa"), Row.of(i, "bbb"), Row.of(i, "ccc"))) + .flatMap(List::stream) + .collect(Collectors.toList()); + String dataId = BoundedTableFactory.registerDataSet(ImmutableList.of(dataSet)); + sql( + "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", + SOURCE_TABLE, dataId); + + PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner(); + String insertSQL = + String.format( + "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s", + TABLE_NAME, SOURCE_TABLE); + ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); + Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0); + Transformation<?> committer = dummySink.getInputs().get(0); + Transformation<?> writer = committer.getInputs().get(0); + + Assert.assertEquals("Should have the expected 1 parallelism.", 1, writer.getParallelism()); + + writer + .getInputs() + .forEach( + input -> + Assert.assertEquals( + "Should have the expected parallelism.", + isStreamingJob ? 2 : 4, + input.getParallelism())); + } + @Test public void testReplacePartitions() throws Exception { Assume.assumeFalse(
