1996fanrui commented on code in PR #27070:
URL: https://github.com/apache/flink/pull/27070#discussion_r2433024885
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,66 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. If not specified, Flink uses
state-backend specific defaults.")
+ .linebreak()
+ .text("The option takes effect
during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. If not specified, Flink uses
state-backend specific defaults.")
+ .linebreak()
+ .text("The option takes effect
during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<SinkUpsertMaterializeStrategy>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
+ key("table.exec.sink.upsert-materialize.strategy")
+ .enumType(SinkUpsertMaterializeStrategy.class)
+ .noDefaultValue()
Review Comment:
`The default is LEGACY`, why `noDefaultValue() ` here?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalOverAggregateRule.scala:
##########
@@ -27,6 +27,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalOv
import org.apache.flink.table.planner.plan.utils.{AggregateUtil,
OverAggregateUtil, SortUtil}
import org.apache.flink.table.planner.plan.utils.PythonUtil.isPythonAggregate
import org.apache.flink.table.planner.utils.ShortcutUtils
+import org.apache.flink.table.typeutils.RowTypeUtils
Review Comment:
This file does not change any code, why import is needed? by mistake or do I
miss something?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,66 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. If not specified, Flink uses
state-backend specific defaults.")
+ .linebreak()
+ .text("The option takes effect
during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. If not specified, Flink uses
state-backend specific defaults.")
+ .linebreak()
+ .text("The option takes effect
during job (re)starting")
Review Comment:
I do not understand why mentioning `"The option takes effect during job
(re)starting"`. As I understand, almost all of config options take effect after
job (re)starting.
Do you wanna point out that these 2 config options cannot be changed via
dynamic conf?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -243,16 +268,37 @@ protected Transformation<RowData> applyUpsertMaterialize(
classLoader)
.generateRecordEqualiser("SinkMaterializeUpsertKeyEqualiser");
- final long stateRetentionTime =
- StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList);
+ GeneratedHashFunction rowHashFunction =
+ HashCodeGenerator.generateRowHash(
+ new CodeGeneratorContext(config, classLoader),
+ physicalRowType,
+ "hashCode",
+ IntStream.range(0,
physicalRowType.getFieldCount()).toArray());
- SinkUpsertMaterializer operator =
- new SinkUpsertMaterializer(
- StateConfigUtil.createTtlConfig(stateRetentionTime),
- InternalSerializers.create(physicalRowType),
- rowEqualiser,
+ final GeneratedHashFunction upsertKeyHashFunction =
+ inputUpsertKey == null
+ ? null
+ : HashCodeGenerator.generateRowHash(
+ new CodeGeneratorContext(config, classLoader),
+ RowTypeUtils.projectRowType(physicalRowType,
inputUpsertKey),
+ "",
Review Comment:
how about setting a name here?
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -159,6 +159,66 @@ public class ExecutionConfigOptions {
+ "or force
materialization(FORCE).")
.build());
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.low")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from MAP to VALUE. If not specified, Flink uses
state-backend specific defaults.")
+ .linebreak()
+ .text("The option takes effect
during job (re)starting")
+ .linebreak()
+ .build());
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<Long>
+ TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
+
key("table.exec.sink.upsert-materialize.adaptive.threshold.high")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "When using
strategy=ADAPTIVE, defines the number of entries per key when the
implementation is changed from VALUE to MAP. If not specified, Flink uses
state-backend specific defaults.")
Review Comment:
Would you mind mentioning the specific default value for each state backend?
##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java:
##########
@@ -360,8 +409,44 @@ private OneInputStreamOperator<RowData, RowData>
createOperatorWithoutUpsertKey(
private OneInputStreamOperator<RowData, RowData> createOperator(
LogicalType[] types, int... upsertKey) {
- return SinkUpsertMaterializer.create(
- TTL_CONFIG, RowType.of(types), EQUALISER,
UPSERT_KEY_EQUALISER, upsertKey);
+ switch (strategy) {
+ case LEGACY:
+ return SinkUpsertMaterializer.create(
+ TTL_CONFIG, RowType.of(types), EQUALISER,
UPSERT_KEY_EQUALISER, upsertKey);
+ case MAP:
+ return createV2(
+ types,
+ upsertKey,
+ SequencedMultiSetStateConfig.forMap(
+ PROCESSING_TIME, StateTtlConfig.DISABLED));
+ case VALUE:
+ return createV2(
+ types,
+ upsertKey,
+ SequencedMultiSetStateConfig.forValue(
+ PROCESSING_TIME, StateTtlConfig.DISABLED));
+ case ADAPTIVE:
+ return createV2(
+ types,
+ upsertKey,
+ SequencedMultiSetStateConfig.adaptive(
+ PROCESSING_TIME, 10L, 5L,
StateTtlConfig.DISABLED));
Review Comment:
how about extracting a method to create `StateTtlConfig` via
`isTtlSupported()` method?
It is helpful for keep the code consistency, only change `isTtlSupported()`
in the future while supporting TTL.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java:
##########
@@ -186,6 +193,15 @@ public String toString() {
+ '}';
Review Comment:
nit: how about adding `isNullAtNonProjected` into `toString()` as well?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerV2.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
+import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState.StateChangeInfo;
+import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
+import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.RowTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An operator that maintains incoming records in state corresponding to the
upsert keys and
+ * generates an upsert view for the downstream operator.
+ *
+ * <ul>
+ * <li>Adds an insertion to state and emits it with updated {@link RowKind}.
+ * <li>Applies a deletion to state.
+ * <li>Emits a deletion with updated {@link RowKind} iff affects the last
record or the state is
+ * empty afterward. A deletion to an already updated record is swallowed.
+ * </ul>
+ */
+@Internal
+public class SinkUpsertMaterializerV2 extends TableStreamOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SinkUpsertMaterializerV2.class);
+
+ private final SequencedMultiSetStateContext stateParameters;
+
+ // Buffer of emitted insertions on which deletions will be applied first.
+ // The row kind might be +I or +U and will be ignored when applying the
deletion.
+ private transient TimestampedCollector<RowData> collector;
+
+ private transient SequencedMultiSetState<RowData> orderedMultiSetState;
+ private final boolean hasUpsertKey;
+
+ public SinkUpsertMaterializerV2(
+ boolean hasUpsertKey, SequencedMultiSetStateContext
stateParameters) {
+ this.hasUpsertKey = hasUpsertKey;
+ this.stateParameters = stateParameters;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ orderedMultiSetState =
+ SequencedMultiSetState.create(
+ stateParameters,
+ getRuntimeContext(),
+ getKeyedStateStore().getBackendTypeIdentifier());
+ collector = new TimestampedCollector<>(output);
+ LOG.info("Opened {} with upsert key: {}",
this.getClass().getSimpleName(), hasUpsertKey);
+ }
+
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception
{
+ final RowData row = element.getValue();
+ final long timestamp = element.getTimestamp();
+
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ if (hasUpsertKey) {
+ collect(row, orderedMultiSetState.add(row,
timestamp).wasEmpty());
+ } else {
+ collect(row, orderedMultiSetState.append(row,
timestamp).wasEmpty());
+ }
+ break;
+
+ case UPDATE_BEFORE:
+ case DELETE:
+ StateChangeInfo<RowData> removalResult =
orderedMultiSetState.remove(row);
+ switch (removalResult.getChangeType()) {
+ case REMOVAL_OTHER:
+ // do nothing;
+ break;
+ case REMOVAL_NOT_FOUND:
+ LOG.warn("Not found record to retract"); // not
logging the record due for
+ // security
+ break;
+ case REMOVAL_ALL:
+ collect(removalResult.getPayload().get(),
RowKind.DELETE);
+ break;
+ case REMOVAL_LAST_ADDED:
+ collect(removalResult.getPayload().get(),
RowKind.UPDATE_AFTER);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unexpected removal result type: " +
removalResult.getChangeType());
+ }
+ }
+ }
+
+ private void collect(RowData row, boolean notExisted) {
+ collect(row, getRowKind(notExisted));
+ }
+
+ private RowKind getRowKind(boolean notExisted) {
+ return notExisted ? RowKind.INSERT : RowKind.UPDATE_AFTER;
+ }
+
+ private void collect(RowData row, RowKind withKind) {
+ RowKind orig = row.getRowKind();
+ row.setRowKind(withKind);
+ collector.collect(row);
+ row.setRowKind(orig);
+ }
+
+ public static SinkUpsertMaterializerV2 create(
+ RowType physicalRowType,
+ GeneratedRecordEqualiser rowEqualiser,
+ GeneratedRecordEqualiser upsertKeyEqualiser,
+ GeneratedHashFunction rowHashFunction,
+ GeneratedHashFunction upsertKeyHashFunction,
+ int[] inputUpsertKey,
+ SequencedMultiSetStateConfig stateSettings) {
+
+ boolean hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length
> 0;
+
+ return new SinkUpsertMaterializerV2(
+ hasUpsertKey,
+ new SequencedMultiSetStateContext(
+ checkNotNull(
+ hasUpsertKey
+ ? InternalSerializers.create(
+ RowTypeUtils.projectRowType(
+ physicalRowType,
inputUpsertKey))
+ :
InternalSerializers.create(physicalRowType)),
+ checkNotNull(hasUpsertKey ? upsertKeyEqualiser :
rowEqualiser),
+ checkNotNull(hasUpsertKey ? upsertKeyHashFunction :
rowHashFunction),
+ InternalSerializers.create(physicalRowType),
+ row ->
+ hasUpsertKey
+ ? ProjectedRowData.from(inputUpsertKey)
+ .withNullAtNonProjected(true)
+ .replaceRow(row)
+ : row,
Review Comment:
I have 2 comments about this part since it is a hot code path:
1. `withNullAtNonProjected` will create a new `ProjectedRowData` object, so
it created 2 `ProjectedRowData` objects for each row.
2. `ProjectedRowData` is created on open in SinkUpsertMaterializer[1], and
then it is reused by all rows. So how about create `ProjectedRowData` only
once as well here.
[1]
https://github.com/apache/flink/blob/e44d6387dbac2f431f918bdc2522ad8d680c8ca7/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java#L123
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala:
##########
@@ -102,13 +103,17 @@ class StreamPhysicalSink(
.reuseOrCreate(cluster.getMetadataQuery)
.getUpsertKeys(inputRel)
+ val config = unwrapTableConfig(this)
new StreamExecSink(
- unwrapTableConfig(this),
+ config,
tableSinkSpec,
inputChangelogMode,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
upsertMaterialize,
+ // persist upsertMaterialize strategy separately in the compiled plan to
make it immutable;
+ // later on, it can't be obtained from the node config because it is
merged with the new environment
+
config.getOptional(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY).orElse(null),
Review Comment:
Why it can be null?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -280,4 +326,64 @@ protected Transformation<RowData> applyUpsertMaterialize(
materializeTransform.setStateKeyType(keySelector.getProducedType());
return materializeTransform;
}
+
+ private OneInputStreamOperator<RowData, RowData> createSumOperator(
+ ExecNodeConfig config,
+ RowType physicalRowType,
+ int[] inputUpsertKey,
+ GeneratedRecordEqualiser upsertKeyEqualiser,
+ GeneratedHashFunction upsertKeyHashFunction,
+ StateTtlConfig ttlConfig,
+ GeneratedRecordEqualiser rowEqualiser,
+ GeneratedHashFunction rowHashFunction) {
+
+ SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
+ Optional.ofNullable(upsertMaterializeStrategy)
+ .orElse(SinkUpsertMaterializeStrategy.LEGACY);
+
+ return sinkUpsertMaterializeStrategy ==
SinkUpsertMaterializeStrategy.LEGACY
+ ? SinkUpsertMaterializer.create(
+ ttlConfig,
+ physicalRowType,
+ rowEqualiser,
+ upsertKeyEqualiser,
+ inputUpsertKey)
+ : SinkUpsertMaterializerV2.create(
+ physicalRowType,
+ rowEqualiser,
+ upsertKeyEqualiser,
+ rowHashFunction,
+ upsertKeyHashFunction,
+ inputUpsertKey,
+ createStateConfig(
+ sinkUpsertMaterializeStrategy,
+ TimeDomain.EVENT_TIME,
+ ttlConfig,
+ config));
+ }
+
+ private static SequencedMultiSetStateConfig createStateConfig(
+ SinkUpsertMaterializeStrategy strategy,
+ TimeDomain ttlTimeDomain,
+ StateTtlConfig ttlConfig,
+ ReadableConfig config) {
+ switch (strategy) {
+ case VALUE:
+ return SequencedMultiSetStateConfig.forValue(ttlTimeDomain,
ttlConfig);
+ case MAP:
+ return SequencedMultiSetStateConfig.forMap(ttlTimeDomain,
ttlConfig);
+ case ADAPTIVE:
+ return SequencedMultiSetStateConfig.adaptive(
+ ttlTimeDomain,
+ config.getOptional(
+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH)
+ .orElse(null),
+ config.getOptional(
+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW)
+ .orElse(null),
Review Comment:
```suggestion
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH),
config.get(TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW),
```
It they are not set, and no default value, get will return null directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]