luoyuxia commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1214112329


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##########
@@ -84,22 +84,23 @@ public StateMetadata(
                 stateIndex,
                 TimeUtils.parseDuration(
                         Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null")),
-                Preconditions.checkNotNull(stateName, "state name should not 
be null"));
+                stateName);
     }
 
-    public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull 
String stateName) {
+    public StateMetadata(int stateIndex, Duration stateTtl, String stateName) {
         Preconditions.checkArgument(stateIndex >= 0, "state index should start 
from 0");
         this.stateIndex = stateIndex;
-        this.stateTtl = stateTtl;
-        this.stateName = stateName;
+        this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null");
+        this.stateName = Preconditions.checkNotNull(stateName, "state name 
should not be null");
     }
 
     public int getStateIndex() {
         return stateIndex;
     }
 
-    public long getStateTtl() {
-        return stateTtl.toMillis();
+    @JsonGetter(value = "ttl")
+    public String getStateTtl() {
+        return TimeUtils.formatWithHighestUnit(stateTtl);
     }
 
     public String getStateName() {

Review Comment:
   Can this method removed?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java:
##########
@@ -256,12 +276,17 @@ private static void checkUidModification(
     // Helper methods
     // 
--------------------------------------------------------------------------------------------
 
-    private static CompiledPlan minimalPlan(TableEnvironment env) {
+    private static CompiledPlan planFromFlink1_18(TableEnvironment env) {

Review Comment:
   rename to `planFromCurrentFlink`?
   As release moves on, the name `planFromFlink1_18` may be out of date.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -332,7 +319,7 @@ protected Transformation<RowData> createJoinTransformation(
         }
     }
 
-    private Transformation<RowData> createSyncLookupJoinWithState(
+    protected Transformation<RowData> createSyncLookupJoinWithState(

Review Comment:
   Is there any strong reason to make it proected and make 
`createSyncLookupJoinWithState`? 
   For me, it's not so much like a base method to be overrided by children 
classes at least for `BatchExecLookupJoin` since there won't be  
LookupJoinWithState in  `BatchExecLookupJoin` .
   
   If we do want to make it a   base method ,  I think we should throw an 
exception in default implementation to avoid the method be called by mistake 
but without specific implemtation.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.table.planner.utils.JsonTestUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for configuring operator-level state TTL via {@link
+ * org.apache.flink.table.api.CompiledPlan}.
+ */
+public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase 
{
+
+    @Test
+    public void testDifferentStateTtlForDifferentOneInputOperator() throws 
Exception {
+        String dataId =
+                TestValuesTableFactory.registerRowData(
+                        Arrays.asList(
+                                GenericRowData.of(1, 
StringData.fromString("Tom"), 1, 199.9d),
+                                GenericRowData.of(2, 
StringData.fromString("Jerry"), 2, 99.9d),
+                                GenericRowData.of(1, 
StringData.fromString("Tom"), 1, 199.9d),
+                                GenericRowData.of(3, 
StringData.fromString("Tom"), 1, 29.9d),
+                                GenericRowData.of(4, 
StringData.fromString("Olivia"), 1, 100d),
+                                GenericRowData.of(4, 
StringData.fromString("Olivia"), 1, 100d),
+                                GenericRowData.of(2, 
StringData.fromString("Jerry"), 2, 99.9d),
+                                GenericRowData.of(5, 
StringData.fromString("Michael"), 3, 599.9d),
+                                GenericRowData.of(6, 
StringData.fromString("Olivia"), 3, 1000d)));
+        createTestSourceTable(
+                "Orders",
+                new String[] {
+                    "`order_id` INT", "`buyer` STRING", "`quantity` INT", 
"`amount` DOUBLE"
+                },
+                null,
+                getProperties(dataId, 1, "2s"));
+
+        createTestNonInsertOnlyValuesSinkTable(
+                "OrdersStats",
+                "`buyer` STRING",
+                "`ord_cnt` BIGINT",
+                "`quantity_cnt` BIGINT",
+                "`total_amount` DOUBLE");
+        compileSqlAndExecutePlan(
+                        "INSERT INTO OrdersStats \n"
+                                + "SELECT buyer, COUNT(1) AS ord_cnt, 
SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount FROM (\n"
+                                + "SELECT *, ROW_NUMBER() OVER(PARTITION BY 
order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders) 
tmp\n"
+                                + "WHERE rk = 1\n"
+                                + "GROUP BY buyer",
+                        json -> {
+                            try {
+                                JsonNode target = 
JsonTestUtils.readFromString(json);
+                                JsonTestUtils.setExecNodeStateMetadata(
+                                        target, "stream-exec-deduplicate", 0, 
6000L);
+                                JsonTestUtils.setExecNodeStateMetadata(

Review Comment:
   I'm wondering whether it'll be better to change it `8000L`.
   The reason is that if we remove this code line, the test can still passs so 
that we can't make sure set state for `stream-exec-group-aggregate` also make 
difference.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonPlanTestBase.java:
##########
@@ -67,11 +68,18 @@ public void after() {
     }
 
     protected TableResult compileSqlAndExecutePlan(String sql) {
+        return compileSqlAndExecutePlan(sql, json -> json);
+    }
+
+    protected TableResult compileSqlAndExecutePlan(
+            String sql, Function<String, String> planJsonTransformer) {

Review Comment:
   nit:
   `jsonPlanTransformer`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java:
##########
@@ -71,20 +75,36 @@
         producedTransformations = 
StreamExecChangelogNormalize.CHANGELOG_NORMALIZE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
+@ExecNodeMetadata(
+        name = "stream-exec-changelog-normalize",
+        version = 2,

Review Comment:
   After reading the doc of the method `minPlanVersion`
   > Needs to be updated when the JSON for the {@link ExecNode} changes: e.g. 
after adding an
       attribute to the JSON spec of the ExecNode.
   
   I think we need introduce a new version since the json for the node changes.
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##########
@@ -84,22 +84,23 @@ public StateMetadata(
                 stateIndex,
                 TimeUtils.parseDuration(
                         Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null")),
-                Preconditions.checkNotNull(stateName, "state name should not 
be null"));
+                stateName);
     }
 
-    public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull 
String stateName) {
+    public StateMetadata(int stateIndex, Duration stateTtl, String stateName) {
         Preconditions.checkArgument(stateIndex >= 0, "state index should start 
from 0");
         this.stateIndex = stateIndex;
-        this.stateTtl = stateTtl;
-        this.stateName = stateName;
+        this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null");
+        this.stateName = Preconditions.checkNotNull(stateName, "state name 
should not be null");
     }
 
     public int getStateIndex() {
         return stateIndex;
     }
 
-    public long getStateTtl() {
-        return stateTtl.toMillis();
+    @JsonGetter(value = "ttl")

Review Comment:
   nit:
   use `FIELD_NAME_STATE_TTL`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java:
##########
@@ -91,6 +107,7 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
     public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
     public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
     public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = 
"generateUpdateBefore";
+    public static final String STATE_NAME = "deduplicateState";

Review Comment:
   Just double check, from the exampe in the 
[FLIP-292](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240883951),
 seems the name is `deduplicate-state`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to