mxm commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3016175297
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer<DynamicWriteResult>
getWriteResultSerializer()
return new DynamicWriteResultSerializer();
}
+ /**
+ * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the
forward write path.
+ * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator}
emits committables
+ * downstream. The committer is never called — committing is handled by the
main sink.
+ */
+ @VisibleForTesting
+ static class ForwardWriterSink
+ implements Sink<DynamicRecordInternal>,
SupportsCommitter<DynamicWriteResult> {
+
+ private final CatalogLoader catalogLoader;
+ private final Map<String, String> writeProperties;
+ private final Configuration flinkConfig;
+ private final int cacheMaximumSize;
+
+ ForwardWriterSink(
+ CatalogLoader catalogLoader,
+ Map<String, String> writeProperties,
+ Configuration flinkConfig,
+ int cacheMaximumSize) {
+ this.catalogLoader = catalogLoader;
+ this.writeProperties = writeProperties;
+ this.flinkConfig = flinkConfig;
+ this.cacheMaximumSize = cacheMaximumSize;
+ }
+
+ @Override
+ public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
+ return new DynamicWriter(
+ catalogLoader.loadCatalog(),
+ writeProperties,
+ flinkConfig,
+ cacheMaximumSize,
+ new DynamicWriterMetrics(context.metricGroup()),
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber());
+ }
+
+ @Override
+ public Committer<DynamicWriteResult> createCommitter(CommitterInitContext
context) {
+ throw new UnsupportedOperationException(
+ "WriterSink is used only for writing; committing is handled by the
main sink");
+ }
Review Comment:
I see now that we need `Sink` when we use `SinkWriterOperatorFactory`, but
we can remove the `SupportsCommitter`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]