pvary commented on code in PR #15433:
URL: https://github.com/apache/iceberg/pull/15433#discussion_r2964463353
##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java:
##########
@@ -1345,6 +1465,100 @@ public Committer<DynamicCommittable>
createCommitter(CommitterInitContext contex
}
}
+ static class ShuffleOrForwardTrackingBuilder<T> extends
DynamicIcebergSink.Builder<T> {
+ static List<DynamicRecordInternal> forwardRecords = new
CopyOnWriteArrayList<>();
+ static List<DynamicRecordInternal> shuffleRecords = new
CopyOnWriteArrayList<>();
+
+ static void reset() {
+ forwardRecords.clear();
+ shuffleRecords.clear();
+ }
+
+ @Override
+ DynamicIcebergSink instantiateSink(
+ Map<String, String> writeProperties, Configuration flinkConfig,
boolean forwardOnly) {
+ return new TrackingSink(
+ CATALOG_EXTENSION.catalogLoader(),
+ Collections.emptyMap(),
+ "uidPrefix",
+ writeProperties,
+ flinkConfig,
+ 100,
+ forwardOnly);
+ }
+
+ private static class TrackingSink extends DynamicIcebergSink {
+ private final boolean forwardOnly;
+
+ TrackingSink(
+ CatalogLoader catalogLoader,
+ Map<String, String> snapshotProperties,
+ String uidPrefix,
+ Map<String, String> writeProperties,
+ Configuration flinkConfig,
+ int cacheMaximumSize,
+ boolean forwardOnly) {
+ super(
+ catalogLoader,
+ snapshotProperties,
+ uidPrefix,
+ writeProperties,
+ flinkConfig,
+ cacheMaximumSize,
+ forwardOnly);
+ this.forwardOnly = forwardOnly;
+ }
+
+ @Override
+ public SinkWriter<DynamicRecordInternal> createWriter(WriterInitContext
context) {
+ return new TrackingWriter(
+ (CommittingSinkWriter<DynamicRecordInternal, DynamicWriteResult>)
+ super.createWriter(context),
+ forwardOnly);
+ }
+ }
+
+ private static class TrackingWriter
+ implements CommittingSinkWriter<DynamicRecordInternal,
DynamicWriteResult> {
+ private final CommittingSinkWriter<DynamicRecordInternal,
DynamicWriteResult> delegate;
+ private final boolean forwardOnly;
+
+ TrackingWriter(
+ CommittingSinkWriter<DynamicRecordInternal, DynamicWriteResult>
delegate,
+ boolean forwardOnly) {
+ this.delegate = delegate;
+ this.forwardOnly = forwardOnly;
+ }
+
+ @Override
+ public void write(DynamicRecordInternal element, Context context)
+ throws IOException, InterruptedException {
+ if (forwardOnly) {
+ forwardRecords.add(element);
+ } else {
+ shuffleRecords.add(element);
+ }
+ delegate.write(element, context);
Review Comment:
nit: newline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]