LadyForest commented on code in PR #24179:
URL: https://github.com/apache/flink/pull/24179#discussion_r1465877429


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java:
##########
@@ -83,21 +85,50 @@ final List<RelNode> resolve(List<RelNode> roots) {
     }
 
     @Override
-    protected RelNode visitBiRel(BiRel biRel) {
-        Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
-        Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
-
-        Set<RelHint> existentKVHints = new HashSet<>();
-
-        List<RelHint> oldHints = ((Hintable) biRel).getHints();
+    protected RelNode doVisit(RelNode node) {
+        List<RelHint> oldHints = ((Hintable) node).getHints();
         List<RelHint> oldQueryHints = FlinkHints.getAllQueryHints(oldHints);
         // has no hints, return directly.
         if (oldQueryHints.isEmpty()) {
-            return super.visitChildren(biRel);
+            return super.visitChildren(node);
         }
 
-        List<RelHint> newHints = new ArrayList<>();
+        final List<RelHint> newHints;
+        if (node instanceof BiRel) {
+            BiRel biRel = (BiRel) node;
+            Optional<String> leftName = 
extractAliasOrTableName(biRel.getLeft());
+            Optional<String> rightName = 
extractAliasOrTableName(biRel.getRight());
+            newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, 
oldHints);

Review Comment:
   Nit: I think `validateAndGetNewHints` is good enough, and it's ok to have an 
overloaded method here. Another point is I don't think passing `Optional<T>` as 
arguments is a good practice.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -52,10 +52,13 @@ public abstract class FlinkHints {
     // ~ Internal alias tag hint
     public static final String HINT_ALIAS = "ALIAS";
 
-    // ~ Option name for hints on join or correlate
+    // ~ Option name for hints on BiRel like join or correlate
     public static final String LEFT_INPUT = "LEFT";
     public static final String RIGHT_INPUT = "RIGHT";
 
+    // ~ Option name for hints on SingleRel like aggregate
+    public static final String INPUT = "INPUT";

Review Comment:
   Is it more natural to convert the KV options to list options for agg?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java:
##########
@@ -122,8 +122,9 @@ public static HintStrategyTable createHintStrategyTable() {
                                 .build())
                 .hintStrategy(
                         StateTtlHint.STATE_TTL.getHintName(),
-                        // TODO support agg state ttl hint
-                        HintStrategy.builder(HintPredicates.JOIN)
+                        HintStrategy.builder(

Review Comment:
   Nit: this is a comment on `STATE_TTL_NON_EMPTY_KV_OPTION_CHECKER`.
   We'd better keep a unified error message template with other hints.
   
   Suggested changes
   ```java
    litmus.check(
                           ttlHint.kvOptions.size() > 0,
                           "Invalid STATE_TTL hint, expecting at least one 
key-value options specified.");
   ```
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java:
##########
@@ -87,4 +91,28 @@ public static Map<Integer, Long> 
getStateTtlFromHint(List<RelHint> hints) {
 
         return stateTtlFromHint;
     }
+
+    /**
+     * Get the state ttl from hints on the {@link 
org.apache.calcite.rel.SingleRel} such as
+     * Aggregate.
+     *
+     * @return the state ttl in milliseconds. If no state ttl hints set from 
hint, return "null".
+     */
+    @Nullable
+    public static Long getStateTtlFromHintOnSingleRel(List<RelHint> hints) {
+        AtomicReference<Long> stateTtlFromHint = new AtomicReference<>(null);
+        hints.stream()
+                .filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
+                .forEach(
+                        hint ->
+                                hint.kvOptions.forEach(
+                                        (input, ttl) -> {
+                                            if 
(FlinkHints.INPUT.equals(input)) {
+                                                stateTtlFromHint.set(
+                                                        
TimeUtils.parseDuration(ttl).toMillis());
+                                            }
+                                        }));
+
+        return stateTtlFromHint.get();

Review Comment:
   ```suggestion
           return hints.stream()
                   .filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
                   .flatMap(hint -> hint.listOptions.stream())
                   .map(ttl -> TimeUtils.parseDuration(ttl).toMillis())
                   .collect(Collectors.toList())
                   .get(0);
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java:
##########
@@ -228,6 +290,19 @@ && matchIdentifier(input, rightName.get())) {
         return newOptions;
     }
 
+    private Map<String, String> getNewStateTtlHintOptions(
+            Optional<String> inputName, Map<String, String> kvOptions, String 
hintName) {
+        updateInfoForOptionCheck(hintName, inputName);
+        Map<String, String> newOptions = new HashMap<>();
+        kvOptions.forEach(
+                (input, ttl) -> {
+                    if (inputName.isPresent() && matchIdentifier(input, 
inputName.get())) {
+                        newOptions.put(FlinkHints.INPUT, ttl);
+                    }
+                });
+        return newOptions;

Review Comment:
   How about to return list options?
   ```suggestion
           Preconditions.checkArgument(kvOptions.size() == 1);
           updateInfoForOptionCheck(hintName, inputName);
           return kvOptions.entrySet().stream()
                   .filter(
                           entry ->
                                   inputName.isPresent()
                                           && matchIdentifier(entry.getKey(), 
inputName.get()))
                   .map(Map.Entry::getValue)
                   .collect(Collectors.toList());
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala:
##########
@@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate(
       generateUpdateBefore,
       needRetraction,
       indexOfCountStar.map(Integer.valueOf).orNull,
+      StateTtlHint.getStateTtlFromHintOnSingleRel(hints),

Review Comment:
   This is commented on `explainTerms`.
   Before `STATE_TTL` hint is introduced, the rest query hints(mainly join 
hints for batch and lookup join hints for stream) are converted and use `itemIf`
   So, I rethink the explanation format. How about
   ```scala
   // for agg
         .itemIf(
           "stateTtl",
           StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
           hints.stream().anyMatch(hint => 
StateTtlHint.isStateTtlHint(hint.hintName)))
   
   // which yields
   // stateTtl=[1d]
   
   // for regular join
   val ttlHint = StateTtlHint.getStateTtlFromHintOnBiRel(getHints)
         .itemIf("leftStateTtl", ttlHint.get(0), ttlHint.containsKey(0))
         .itemIf("rightStateTtl", ttlHint.get(1), ttlHint.containsKey(1))
   // which yields
   // leftStateTtl=[1d], rightStateTtl=[3d]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to