rkhachatryan commented on code in PR #27070:
URL: https://github.com/apache/flink/pull/27070#discussion_r2435810781


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -280,4 +326,64 @@ protected Transformation<RowData> applyUpsertMaterialize(
         materializeTransform.setStateKeyType(keySelector.getProducedType());
         return materializeTransform;
     }
+
+    private OneInputStreamOperator<RowData, RowData> createSumOperator(
+            ExecNodeConfig config,
+            RowType physicalRowType,
+            int[] inputUpsertKey,
+            GeneratedRecordEqualiser upsertKeyEqualiser,
+            GeneratedHashFunction upsertKeyHashFunction,
+            StateTtlConfig ttlConfig,
+            GeneratedRecordEqualiser rowEqualiser,
+            GeneratedHashFunction rowHashFunction) {
+
+        SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
+                Optional.ofNullable(upsertMaterializeStrategy)
+                        .orElse(SinkUpsertMaterializeStrategy.LEGACY);
+
+        return sinkUpsertMaterializeStrategy == 
SinkUpsertMaterializeStrategy.LEGACY
+                ? SinkUpsertMaterializer.create(
+                        ttlConfig,
+                        physicalRowType,
+                        rowEqualiser,
+                        upsertKeyEqualiser,
+                        inputUpsertKey)
+                : SinkUpsertMaterializerV2.create(
+                        physicalRowType,
+                        rowEqualiser,
+                        upsertKeyEqualiser,
+                        rowHashFunction,
+                        upsertKeyHashFunction,
+                        inputUpsertKey,
+                        createStateConfig(
+                                sinkUpsertMaterializeStrategy,
+                                TimeDomain.EVENT_TIME,
+                                ttlConfig,
+                                config));
+    }
+
+    private static SequencedMultiSetStateConfig createStateConfig(
+            SinkUpsertMaterializeStrategy strategy,
+            TimeDomain ttlTimeDomain,
+            StateTtlConfig ttlConfig,
+            ReadableConfig config) {
+        switch (strategy) {
+            case VALUE:
+                return SequencedMultiSetStateConfig.forValue(ttlTimeDomain, 
ttlConfig);
+            case MAP:
+                return SequencedMultiSetStateConfig.forMap(ttlTimeDomain, 
ttlConfig);
+            case ADAPTIVE:
+                return SequencedMultiSetStateConfig.adaptive(
+                        ttlTimeDomain,
+                        config.getOptional(
+                                        
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH)
+                                .orElse(null),
+                        config.getOptional(
+                                        
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW)
+                                .orElse(null),

Review Comment:
   I always though it would throw an exception :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to