pnowojski commented on code in PR #27070:
URL: https://github.com/apache/flink/pull/27070#discussion_r2440127318


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -883,6 +935,43 @@ public enum RetryStrategy {
         FIXED_DELAY
     }
 
+    /** SinkUpsertMaterializer strategy. */
+    @PublicEvolving
+    public enum SinkUpsertMaterializeStrategy {
+        /**
+         * Simple implementation based on {@code ValueState<List>} (the 
original implementation).
+         *
+         * <ul>
+         *   <li>optimal for cases with history under approx. 100 elements
+         *   <li>limited TTL support (per key granularity, i.e. no expiration 
for old history
+         *       elements)
+         * </ul>
+         */
+        LEGACY,
+        /**
+         * OrderedMultiSetState-based implementation based on a combination of 
several MapState
+         * maintaining ordering and fast lookup properties.
+         *
+         * <ul>
+         *   <li>faster and more memory-efficient on long histories
+         *   <li>slower on short histories
+         *   <li>currently, no TTL support (to be added in the future)
+         *   <li>requires more space
+         * </ul>
+         */
+        MAP,
+        /**
+         * Similar to LEGACY, but compatible with MAP and therefore allows to 
switch to ADAPTIVE.
+         */

Review Comment:
   Does it mean that `LEGACY` doesn't support switching to anything else?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
                                                     + "or force 
materialization(FORCE).")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from MAP to VALUE. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (300 for hashmap state 
backend and 40 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from VALUE to MAP. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (400 for hashmap state 
backend and 50 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<SinkUpsertMaterializeStrategy>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+                    key("table.exec.sink.upsert-materialize-strategy.type")
+                            .enumType(SinkUpsertMaterializeStrategy.class)
+                            .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "Which strategy of 
SinkUpsertMaterializer to use. Supported strategies:")
+                                            .linebreak()
+                                            .text(
+                                                    "LEGACY: Simple 
implementation based on ValueState<List> (the original implementation).")

Review Comment:
   nit instead of hardcoding `LEGACY` I would use 
`SinkUpsertMaterializeStrategy.LEGACY.toString()`. (here and in other places). 
Will be easier in case of refactorings.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
                                                     + "or force 
materialization(FORCE).")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from MAP to VALUE. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (300 for hashmap state 
backend and 40 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from VALUE to MAP. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (400 for hashmap state 
backend and 50 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<SinkUpsertMaterializeStrategy>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+                    key("table.exec.sink.upsert-materialize-strategy.type")
+                            .enumType(SinkUpsertMaterializeStrategy.class)
+                            .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "Which strategy of 
SinkUpsertMaterializer to use. Supported strategies:")

Review Comment:
   Can you change between strategies?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -104,6 +118,10 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
     private final boolean upsertMaterialize;
 
+    @JsonProperty(FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final SinkUpsertMaterializeStrategy upsertMaterializeStrategy;

Review Comment:
   I take it that this code doesn't support any migration between different 
strategies? There is even no way for changing `VALUE`->`ADAPTIVE`?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java:
##########
@@ -82,7 +88,8 @@ public void setRowKind(RowKind kind) {
 
     @Override
     public boolean isNullAt(int pos) {
-        return row.isNullAt(indexMapping[pos]);
+        return (pos >= indexMapping.length && isNullAtNonProjected)
+                || row.isNullAt(indexMapping[pos]);

Review Comment:
   nit: some trivial test coverage for this maybe?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
                                                     + "or force 
materialization(FORCE).")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from MAP to VALUE. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (300 for hashmap state 
backend and 40 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<Long>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+                    
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+                            .longType()
+                            .noDefaultValue()
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "When using 
strategy=ADAPTIVE, defines the number of entries per key when the 
implementation is changed from VALUE to MAP. "
+                                                            + "If not 
specified, Flink uses state-backend specific defaults (400 for hashmap state 
backend and 50 for RocksDB and the rest).")
+                                            .linebreak()
+                                            .build());
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<SinkUpsertMaterializeStrategy>
+            TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+                    key("table.exec.sink.upsert-materialize-strategy.type")

Review Comment:
   Shouldn't this be randomized for the ITCases? 🤔 Dunno how well does the 
configuration randomisation framework that we have works with SQL/Table API 
tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to