mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3085223042
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -79,13 +80,17 @@ public class DynamicIcebergSink
private final Configuration flinkConfig;
private final int cacheMaximumSize;
+ // Set by the builder before sinkTo() — forward writer results to union into
pre-commit topology
+ private final transient DataStream<CommittableMessage<DynamicWriteResult>>
forwardWriteResults;
+
DynamicIcebergSink(
CatalogLoader catalogLoader,
Map<String, String> snapshotProperties,
String uidPrefix,
Map<String, String> writeProperties,
Configuration flinkConfig,
- int cacheMaximumSize) {
+ int cacheMaximumSize,
+ DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults) {
Review Comment:
```suggestion
@Nullable DataStream<CommittableMessage<DynamicWriteResult>>
forwardWriteResults) {
```
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -357,43 +416,79 @@ private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
- private DynamicIcebergSink build() {
+ private DynamicIcebergSink build(
+ SingleOutputStreamOperator<DynamicRecordInternal> converted,
+ DynamicRecordInternalType sideOutputType) {
Preconditions.checkArgument(
generator != null, "Please use withGenerator() to convert the input
DataStream.");
Preconditions.checkNotNull(catalogLoader, "Catalog loader shouldn't be
null");
- uidPrefix = Optional.ofNullable(uidPrefix).orElse("");
-
Configuration flinkConfig =
readableConfig instanceof Configuration
? (Configuration) readableConfig
: Configuration.fromMap(readableConfig.toMap());
- return instantiateSink(writeOptions, flinkConfig);
+ // Forward writer: chained with generator via forward edge, no data
shuffle
+ ForwardWriterSink forwardWriterSink =
+ new ForwardWriterSink(catalogLoader, writeOptions, flinkConfig,
cacheMaximumSize);
+ TypeInformation<CommittableMessage<DynamicWriteResult>>
writeResultTypeInfo =
+ CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
+
+ DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
+ converted
+ .getSideOutput(
+ new
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
+ .transform(
+ operatorName("Forward-Writer"),
+ writeResultTypeInfo,
+ new SinkWriterOperatorFactory<>(forwardWriterSink))
+ .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
Review Comment:
Could you please check the `testOperatorUidsFormat` test. The new UUID of
the "Forward-Writer" should be added to that test. The test currently uses
`contains()` logic and needs to be migrated to use `containsOnly` to ensure the
test fails when we add new operators.
##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##########
@@ -238,6 +291,117 @@ void testWrite() throws Exception {
runTest(rows);
}
+ @Test
+ void testNoShuffleTopology() throws Exception {
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(
+ Collections.emptyList(), TypeInformation.of(new
TypeHint<DynamicIcebergDataImpl>() {}));
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new ForwardGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(2)
+ .immediateTableUpdate(false)
+ .overwrite(false)
+ .append();
+
+ boolean generatorAndSinkChained = false;
+ for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+ boolean generatorInThisVertex = false;
+ boolean sinkInThisVertex = false;
+ for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+ String uid = operatorID.getUserDefinedOperatorUid();
+ if (uid == null) {
+ continue;
+ }
+
+ if (uid.endsWith("-forward-writer")) {
+ sinkInThisVertex = true;
+ } else if (uid.endsWith("-generator")) {
+ generatorInThisVertex = true;
+ }
+ }
+
+ generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex;
+ if (generatorAndSinkChained) {
+ break;
+ }
+ }
+
+ assertThat(generatorAndSinkChained).isTrue();
+ }
Review Comment:
Makes me think whether we should break apart this class into an integration
test class and a unit test class. We can do that in a follow-up.
##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##########
@@ -238,6 +291,117 @@ void testWrite() throws Exception {
runTest(rows);
}
+ @Test
+ void testNoShuffleTopology() throws Exception {
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(
+ Collections.emptyList(), TypeInformation.of(new
TypeHint<DynamicIcebergDataImpl>() {}));
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new ForwardGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(2)
+ .immediateTableUpdate(false)
+ .overwrite(false)
+ .append();
+
+ boolean generatorAndSinkChained = false;
+ for (JobVertex vertex : env.getStreamGraph().getJobGraph().getVertices()) {
+ boolean generatorInThisVertex = false;
+ boolean sinkInThisVertex = false;
+ for (OperatorIDPair operatorID : vertex.getOperatorIDs()) {
+ String uid = operatorID.getUserDefinedOperatorUid();
+ if (uid == null) {
+ continue;
+ }
+
+ if (uid.endsWith("-forward-writer")) {
+ sinkInThisVertex = true;
+ } else if (uid.endsWith("-generator")) {
+ generatorInThisVertex = true;
+ }
+ }
+
+ generatorAndSinkChained = generatorInThisVertex && sinkInThisVertex;
+ if (generatorAndSinkChained) {
+ break;
+ }
+ }
+
+ assertThat(generatorAndSinkChained).isTrue();
+ }
+
+ @Test
+ void testForwardWrite() throws Exception {
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()));
+
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+ env.setParallelism(1);
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new ForwardGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(1)
+ .immediateTableUpdate(true)
+ .append();
+
+ env.execute("Test Forward Write");
+
+ verifyResults(rows);
+ }
+
+ @Test
+ void testMixedForwardAndShuffleWrite() throws Exception {
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA,
+ "t1",
+ SnapshotRef.MAIN_BRANCH,
+ PartitionSpec.unpartitioned()));
+
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+ env.setParallelism(1);
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new MixedGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(1)
+ .immediateTableUpdate(true)
+ .append();
+
+ env.execute("Test Mixed Forward and Shuffle Write");
+
+ verifyResults(rows);
+ }
Review Comment:
The only thing that changes between the two tests is the generator function.
Should we move it to a method and call it from the two test methods?
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -79,13 +80,17 @@ public class DynamicIcebergSink
private final Configuration flinkConfig;
private final int cacheMaximumSize;
+ // Set by the builder before sinkTo() — forward writer results to union into
pre-commit topology
+ private final transient DataStream<CommittableMessage<DynamicWriteResult>>
forwardWriteResults;
Review Comment:
```suggestion
@Nullable
private final transient DataStream<CommittableMessage<DynamicWriteResult>>
forwardWriteResults;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]