jordepic commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r3010496222
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java:
##########
@@ -167,6 +180,55 @@ public SimpleVersionedSerializer<DynamicWriteResult>
getWriteResultSerializer()
return new DynamicWriteResultSerializer();
}
+ /**
+ * A lightweight Sink used with {@link SinkWriterOperatorFactory} for the
forward write path.
+ * Implements {@link SupportsCommitter} so that {@code SinkWriterOperator}
emits committables
+ * downstream. The committer is never called — committing is handled by the
main sink.
+ */
+ @VisibleForTesting
+ static class ForwardWriterSink
+ implements Sink<DynamicRecordInternal>,
SupportsCommitter<DynamicWriteResult> {
+
+ private final CatalogLoader catalogLoader;
+ private final Map<String, String> writeProperties;
+ private final Configuration flinkConfig;
+ private final int cacheMaximumSize;
+
+ ForwardWriterSink(
+ CatalogLoader catalogLoader,
+ Map<String, String> writeProperties,
+ Configuration flinkConfig,
+ int cacheMaximumSize) {
+ this.catalogLoader = catalogLoader;
+ this.writeProperties = writeProperties;
+ this.flinkConfig = flinkConfig;
+ this.cacheMaximumSize = cacheMaximumSize;
+ }
+
+ @Override
+ public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
+ return new DynamicWriter(
+ catalogLoader.loadCatalog(),
+ writeProperties,
+ flinkConfig,
+ cacheMaximumSize,
+ new DynamicWriterMetrics(context.metricGroup()),
+ context.getTaskInfo().getIndexOfThisSubtask(),
+ context.getTaskInfo().getAttemptNumber());
+ }
+
+ @Override
+ public Committer<DynamicWriteResult> createCommitter(CommitterInitContext
context) {
+ throw new UnsupportedOperationException(
+ "WriterSink is used only for writing; committing is handled by the
main sink");
+ }
Review Comment:
The SinkWriterOperator is what prepares the snapshots for us pre barrier, so
I had looked to reuse code where possible
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]