dawidwys commented on code in PR #27602:
URL: https://github.com/apache/flink/pull/27602#discussion_r2832258317
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -341,26 +337,47 @@ protected Transformation<RowData> applyUpsertMaterialize(
StateConfigUtil.createTtlConfig(
StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList));
+ final String[] pkFieldNames =
+ Arrays.stream(primaryKeys)
+ .mapToObj(idx ->
physicalRowType.getFieldNames().get(idx))
+ .toArray(String[]::new);
+
final OneInputStreamOperator<RowData, RowData> operator =
createSumOperator(
config,
physicalRowType,
+ primaryKeys,
+ pkFieldNames,
inputUpsertKey,
upsertKeyEqualiser,
upsertKeyHashFunction,
ttlConfig,
rowEqualiser,
rowHashFunction);
- final String[] fieldNames =
physicalRowType.getFieldNames().toArray(new String[0]);
- final List<String> pkFieldNames =
- Arrays.stream(primaryKeys)
- .mapToObj(idx -> fieldNames[idx])
- .collect(Collectors.toList());
+ // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
+ // This assigns the current watermark as the timestamp to each record,
+ // which is required for the WatermarkCompactingSinkMaterializer to
work correctly
+ Transformation<RowData> transformForMaterializer = inputTransform;
+ if (isErrorOrNothingConflictStrategy()) {
+ // Use input parallelism to preserve watermark semantics
+ transformForMaterializer =
+ ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationMeta(
+
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
+ "WatermarkTimestampAssigner",
+ "WatermarkTimestampAssigner",
+ config),
+ new WatermarkTimestampAssigner(),
+ inputTransform.getOutputType(),
+ inputTransform.getParallelism(),
+ false);
+ }
Review Comment:
The plan was to add it in a subsequent PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]