pnowojski commented on code in PR #27070:
URL: https://github.com/apache/flink/pull/27070#discussion_r2440127318
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -883,6 +935,43 @@ public enum RetryStrategy {
FIXED_DELAY
}
+ /** SinkUpsertMaterializer strategy. */
+ @PublicEvolving
+ public enum SinkUpsertMaterializeStrategy {
+ /**
+ * Simple implementation based on {@code ValueState<List>} (the
original implementation).
+ *
+ * <ul>
+ * <li>optimal for cases with history under approx. 100 elements
+ * <li>limited TTL support (per key granularity, i.e. no expiration
for old history
+ * elements)
+ * </ul>
+ */
+ LEGACY,
+ /**
+ * OrderedMultiSetState-based implementation based on a combination of
several MapState
+ * maintaining ordering and fast lookup properties.
+ *
+ * <ul>
+ * <li>faster and more memory-efficient on long histories
+ * <li>slower on short histories
+ * <li>currently, no TTL support (to be added in the future)
+ * <li>requires more space
+ * </ul>
+ */
+ MAP,
+ /**
+ * Similar to LEGACY, but compatible with MAP and therefore allows to
switch to ADAPTIVE.
+ */
Review Comment:
Does it mean that `LEGACY` doesn't support switching to anything else?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. "
+ + "If not
specified, Flink uses state-backend specific defaults (300 for hashmap state
backend and 40 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. "
+ + "If not
specified, Flink uses state-backend specific defaults (400 for hashmap state
backend and 50 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<SinkUpsertMaterializeStrategy>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+ key("table.exec.sink.upsert-materialize-strategy.type")
+ .enumType(SinkUpsertMaterializeStrategy.class)
+ .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Which strategy of
SinkUpsertMaterializer to use. Supported strategies:")
+ .linebreak()
+ .text(
+ "LEGACY: Simple
implementation based on ValueState<List> (the original implementation).")
Review Comment:
nit instead of hardcoding `LEGACY` I would use
`SinkUpsertMaterializeStrategy.LEGACY.toString()`. (here and in other places).
Will be easier in case of refactorings.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. "
+ + "If not
specified, Flink uses state-backend specific defaults (300 for hashmap state
backend and 40 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. "
+ + "If not
specified, Flink uses state-backend specific defaults (400 for hashmap state
backend and 50 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<SinkUpsertMaterializeStrategy>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+ key("table.exec.sink.upsert-materialize-strategy.type")
+ .enumType(SinkUpsertMaterializeStrategy.class)
+ .defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Which strategy of
SinkUpsertMaterializer to use. Supported strategies:")
Review Comment:
Can you change between strategies?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -104,6 +118,10 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
private final boolean upsertMaterialize;
+ @JsonProperty(FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY)
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ private final SinkUpsertMaterializeStrategy upsertMaterializeStrategy;
Review Comment:
I take it that this code doesn't support any migration between different
strategies? There is even no way for changing `VALUE`->`ADAPTIVE`?
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java:
##########
@@ -82,7 +88,8 @@ public void setRowKind(RowKind kind) {
@Override
public boolean isNullAt(int pos) {
- return row.isNullAt(indexMapping[pos]);
+ return (pos >= indexMapping.length && isNullAtNonProjected)
+ || row.isNullAt(indexMapping[pos]);
Review Comment:
nit: some trivial test coverage for this maybe?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,58 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. "
+ + "If not
specified, Flink uses state-backend specific defaults (300 for hashmap state
backend and 40 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. "
+ + "If not
specified, Flink uses state-backend specific defaults (400 for hashmap state
backend and 50 for RocksDB and the rest).")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<SinkUpsertMaterializeStrategy>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+ key("table.exec.sink.upsert-materialize-strategy.type")
Review Comment:
Shouldn't this be randomized for the ITCases? 🤔 Dunno how well does the
configuration randomisation framework that we have works with SQL/Table API
tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]