jordepic commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3010480422
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer<DynamicWriteResult>
getWriteResultSerializer()
return new DynamicWriteResultSerializer();
}
+ /**
+ * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the
forward write path.
+ * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator}
emits committables
+ * downstream. The committer is never called — committing is handled by the
main sink.
+ */
+ @VisibleForTesting
+ static class ForwardWriterSink
+ implements Sink<DynamicRecordInternal>,
SupportsCommitter<DynamicWriteResult> {
+
+ private final CatalogLoader catalogLoader;
+ private final Map<String, String> writeProperties;
+ private final Configuration flinkConfig;
+ private final int cacheMaximumSize;
+
+ ForwardWriterSink(
+ CatalogLoader catalogLoader,
+ Map<String, String> writeProperties,
+ Configuration flinkConfig,
+ int cacheMaximumSize) {
+ this.catalogLoader = catalogLoader;
+ this.writeProperties = writeProperties;
+ this.flinkConfig = flinkConfig;
+ this.cacheMaximumSize = cacheMaximumSize;
+ }
+
+ @Override
+ public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
+ return new DynamicWriter(
+ catalogLoader.loadCatalog(),
+ writeProperties,
+ flinkConfig,
+ cacheMaximumSize,
+ new DynamicWriterMetrics(context.metricGroup()),
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber());
+ }
+
+ @Override
+ public Committer<DynamicWriteResult> createCommitter(CommitterInitContext
context) {
+ throw new UnsupportedOperationException(
+ "WriterSink is used only for writing; committing is handled by the
main sink");
+ }
Review Comment:
I had initially done so so that we could wrap it with a
SinkWriterOperatorFactory to ensure that the checkpointing worked as
anticipated and stayed aligned with the other DynamicWriter. Do you think this
is unnecessary? I do agree that overriding SupportsCommitter is probably
unnecessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]