openinx commented on a change in pull request #2886:
URL: https://github.com/apache/iceberg/pull/2886#discussion_r682548420
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -118,6 +117,7 @@ public static Builder forRowData(DataStream<RowData> input)
{
public static class Builder {
private DataStream<RowData> rowDataInput = null;
+ private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;
Review comment:
I tried to initialize the input stream lazily as the following because I
think it could make the source `rowDataInput` stream very clear, it's the only
one stream that we will need to chain the following operator.
```diff
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index bda6667cd..a6be26b31 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -24,6 +24,7 @@ import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
@@ -117,7 +118,7 @@ public class FlinkSink {
public static class Builder {
private DataStream<RowData> rowDataInput = null;
- private SingleOutputStreamOperator<RowData> mappedRowDataInput = null;
+ private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
private TableSchema tableSchema;
@@ -136,9 +137,17 @@ public class FlinkSink {
}
private <T> Builder forMapperOutputType(DataStream<T> input,
- MapFunction<T, RowData> mapper,
- TypeInformation<RowData>
outputType) {
- this.mappedRowDataInput = input.map(mapper, outputType);
+ MapFunction<T, RowData> mapper,
+ TypeInformation<RowData>
outputType) {
+ this.inputCreator = newUidPrefix -> {
+ if (newUidPrefix != null) {
+ return input.map(mapper, outputType)
+ .name(operatorName(newUidPrefix))
+ .uid(newUidPrefix + "-mapper");
+ } else {
+ return input.map(mapper, outputType);
+ }
+ };
return this;
}
@@ -241,8 +250,11 @@ public class FlinkSink {
}
public DataStreamSink<RowData> build() {
- Preconditions.checkArgument(rowDataInput != null ||
mappedRowDataInput != null,
- "Please use forRowData() or forMapperOutputType() to initialize
the input DataStream.");
+ Preconditions.checkArgument(rowDataInput == null || inputCreator ==
null,
+ "Use forRowData() or forRow() to initialize the input DataStream
only once.");
+ Preconditions.checkArgument(rowDataInput != null || inputCreator !=
null,
+ "Please use forRowData() or forRow() to initialize the input
DataStream.");
+ rowDataInput = rowDataInput == null ? inputCreator.apply(uidPrefix) :
rowDataInput;
Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be
null");
if (table == null) {
@@ -254,17 +266,6 @@ public class FlinkSink {
}
}
- // set name and uid for mappedRowDataInput if needed
- if (mappedRowDataInput != null) {
- if (uidPrefix != null) {
- rowDataInput = mappedRowDataInput
- .name(operatorName(uidPrefix))
- .uid(uidPrefix + "-mapper");
- } else {
- rowDataInput = mappedRowDataInput;
- }
- }
-
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
```
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -135,6 +135,13 @@ private Builder forRowData(DataStream<RowData>
newRowDataInput) {
return this;
}
+ private <T> Builder forMapperOutputType(DataStream<T> input,
+ MapFunction<T, RowData> mapper,
Review comment:
Yeah, this is needed.
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
##########
@@ -258,4 +262,58 @@ public void testShuffleByPartitionWithSchema() throws
Exception {
Assert.assertEquals("There should be only 1 data file in partition
'ccc'", 1, partitionFiles("ccc"));
}
}
+
+ @Test
+ public void testTwoSinksInDisjointedDAG() throws Exception {
+ Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+ final String leftTablePath =
TEMPORARY_FOLDER.newFolder().getAbsolutePath().concat("/left");
Review comment:
I think we could follow the rule though it seems like an implicit code
style rule now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]