luoyuxia commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1214112329
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##########
@@ -84,22 +84,23 @@ public StateMetadata(
stateIndex,
TimeUtils.parseDuration(
Preconditions.checkNotNull(stateTtl, "state ttl should
not be null")),
- Preconditions.checkNotNull(stateName, "state name should not
be null"));
+ stateName);
}
- public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull
String stateName) {
+ public StateMetadata(int stateIndex, Duration stateTtl, String stateName) {
Preconditions.checkArgument(stateIndex >= 0, "state index should start
from 0");
this.stateIndex = stateIndex;
- this.stateTtl = stateTtl;
- this.stateName = stateName;
+ this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should
not be null");
+ this.stateName = Preconditions.checkNotNull(stateName, "state name
should not be null");
}
public int getStateIndex() {
return stateIndex;
}
- public long getStateTtl() {
- return stateTtl.toMillis();
+ @JsonGetter(value = "ttl")
+ public String getStateTtl() {
+ return TimeUtils.formatWithHighestUnit(stateTtl);
}
public String getStateName() {
Review Comment:
Can this method removed?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java:
##########
@@ -256,12 +276,17 @@ private static void checkUidModification(
// Helper methods
//
--------------------------------------------------------------------------------------------
- private static CompiledPlan minimalPlan(TableEnvironment env) {
+ private static CompiledPlan planFromFlink1_18(TableEnvironment env) {
Review Comment:
rename to `planFromCurrentFlink`?
As release moves on, the name `planFromFlink1_18` may be out of date.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -332,7 +319,7 @@ protected Transformation<RowData> createJoinTransformation(
}
}
- private Transformation<RowData> createSyncLookupJoinWithState(
+ protected Transformation<RowData> createSyncLookupJoinWithState(
Review Comment:
Is there any strong reason to make it proected and make
`createSyncLookupJoinWithState`?
For me, it's not so much like a base method to be overrided by children
classes at least for `BatchExecLookupJoin` since there won't be
LookupJoinWithState in `BatchExecLookupJoin` .
If we do want to make it a base method , I think we should throw an
exception in default implementation to avoid the method be called by mistake
but without specific implemtation.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.table.planner.utils.JsonTestUtils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for configuring operator-level state TTL via {@link
+ * org.apache.flink.table.api.CompiledPlan}.
+ */
+public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase
{
+
+ @Test
+ public void testDifferentStateTtlForDifferentOneInputOperator() throws
Exception {
+ String dataId =
+ TestValuesTableFactory.registerRowData(
+ Arrays.asList(
+ GenericRowData.of(1,
StringData.fromString("Tom"), 1, 199.9d),
+ GenericRowData.of(2,
StringData.fromString("Jerry"), 2, 99.9d),
+ GenericRowData.of(1,
StringData.fromString("Tom"), 1, 199.9d),
+ GenericRowData.of(3,
StringData.fromString("Tom"), 1, 29.9d),
+ GenericRowData.of(4,
StringData.fromString("Olivia"), 1, 100d),
+ GenericRowData.of(4,
StringData.fromString("Olivia"), 1, 100d),
+ GenericRowData.of(2,
StringData.fromString("Jerry"), 2, 99.9d),
+ GenericRowData.of(5,
StringData.fromString("Michael"), 3, 599.9d),
+ GenericRowData.of(6,
StringData.fromString("Olivia"), 3, 1000d)));
+ createTestSourceTable(
+ "Orders",
+ new String[] {
+ "`order_id` INT", "`buyer` STRING", "`quantity` INT",
"`amount` DOUBLE"
+ },
+ null,
+ getProperties(dataId, 1, "2s"));
+
+ createTestNonInsertOnlyValuesSinkTable(
+ "OrdersStats",
+ "`buyer` STRING",
+ "`ord_cnt` BIGINT",
+ "`quantity_cnt` BIGINT",
+ "`total_amount` DOUBLE");
+ compileSqlAndExecutePlan(
+ "INSERT INTO OrdersStats \n"
+ + "SELECT buyer, COUNT(1) AS ord_cnt,
SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount FROM (\n"
+ + "SELECT *, ROW_NUMBER() OVER(PARTITION BY
order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders)
tmp\n"
+ + "WHERE rk = 1\n"
+ + "GROUP BY buyer",
+ json -> {
+ try {
+ JsonNode target =
JsonTestUtils.readFromString(json);
+ JsonTestUtils.setExecNodeStateMetadata(
+ target, "stream-exec-deduplicate", 0,
6000L);
+ JsonTestUtils.setExecNodeStateMetadata(
Review Comment:
I'm wondering whether it'll be better to change it `8000L`.
The reason is that if we remove this code line, the test can still passs so
that we can't make sure set state for `stream-exec-group-aggregate` also make
difference.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java:
##########
@@ -67,11 +68,18 @@ public void after() {
}
protected TableResult compileSqlAndExecutePlan(String sql) {
+ return compileSqlAndExecutePlan(sql, json -> json);
+ }
+
+ protected TableResult compileSqlAndExecutePlan(
+ String sql, Function<String, String> planJsonTransformer) {
Review Comment:
nit:
`jsonPlanTransformer`?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##########
@@ -71,20 +75,36 @@
producedTransformations =
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+ name = "stream-exec-changelog-normalize",
+ version = 2,
Review Comment:
After reading the doc of the method `minPlanVersion`
> Needs to be updated when the JSON for the {@link ExecNode} changes: e.g.
after adding an
attribute to the JSON spec of the ExecNode.
I think we need introduce a new version since the json for the node changes.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##########
@@ -84,22 +84,23 @@ public StateMetadata(
stateIndex,
TimeUtils.parseDuration(
Preconditions.checkNotNull(stateTtl, "state ttl should
not be null")),
- Preconditions.checkNotNull(stateName, "state name should not
be null"));
+ stateName);
}
- public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull
String stateName) {
+ public StateMetadata(int stateIndex, Duration stateTtl, String stateName) {
Preconditions.checkArgument(stateIndex >= 0, "state index should start
from 0");
this.stateIndex = stateIndex;
- this.stateTtl = stateTtl;
- this.stateName = stateName;
+ this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should
not be null");
+ this.stateName = Preconditions.checkNotNull(stateName, "state name
should not be null");
}
public int getStateIndex() {
return stateIndex;
}
- public long getStateTtl() {
- return stateTtl.toMillis();
+ @JsonGetter(value = "ttl")
Review Comment:
nit:
use `FIELD_NAME_STATE_TTL`?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java:
##########
@@ -91,6 +107,7 @@ public class StreamExecDeduplicate extends
ExecNodeBase<RowData>
public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE =
"generateUpdateBefore";
+ public static final String STATE_NAME = "deduplicateState";
Review Comment:
Just double check, from the exampe in the
[FLIP-292](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951),
seems the name is `deduplicate-state`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]