This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f32b2a9e347 [FLINK-34053][table-planner] Support state ttl hint for 
group aggregate
f32b2a9e347 is described below

commit f32b2a9e347d7539819a88252e4e32deba247515
Author: Xuyang <[email protected]>
AuthorDate: Tue Jan 30 09:31:17 2024 +0800

    [FLINK-34053][table-planner] Support state ttl hint for group aggregate
    
    This closes #24179
---
 .../planner/calcite/RelTimeIndicatorConverter.java |   3 +-
 .../planner/hint/CapitalizeQueryHintsShuttle.java  |  11 +-
 ...earQueryHintsWithInvalidPropagationShuttle.java |  35 ++-
 .../table/planner/hint/FlinkHintStrategies.java    |   7 +-
 .../flink/table/planner/hint/FlinkHints.java       |  11 +-
 .../table/planner/hint/QueryHintsRelShuttle.java   |  13 +-
 .../flink/table/planner/hint/StateTtlHint.java     |  26 +-
 .../planner/plan/nodes/exec/StateMetadata.java     |   9 +-
 .../stream/StreamExecGlobalGroupAggregate.java     |   4 +-
 .../exec/stream/StreamExecGroupAggregate.java      |   4 +-
 .../StreamExecIncrementalGroupAggregate.java       |   4 +-
 .../planner/plan/optimize/QueryHintsResolver.java  | 142 +++++++--
 .../planner/calcite/FlinkLogicalRelFactories.scala |   2 +-
 .../plan/nodes/logical/FlinkLogicalAggregate.scala |  45 ++-
 .../StreamPhysicalGlobalGroupAggregate.scala       |  14 +-
 .../stream/StreamPhysicalGroupAggregate.scala      |  13 +-
 .../stream/StreamPhysicalGroupAggregateBase.scala  |   8 +-
 .../StreamPhysicalIncrementalGroupAggregate.scala  |  14 +-
 .../nodes/physical/stream/StreamPhysicalJoin.scala |   2 +-
 .../plan/rules/logical/SplitAggregateRule.scala    |   6 +-
 .../physical/stream/IncrementalAggregateRule.scala |   7 +-
 .../stream/StreamPhysicalGroupAggregateRule.scala  |   3 +-
 .../stream/TwoStageOptimizedAggregateRule.scala    |   4 +-
 .../planner/plan/utils/RelTreeWriterImpl.scala     |  14 +-
 ...HintsWithInvalidPropagationShuttleTestBase.java |  13 +
 ...eTtlHintsWithInvalidPropagationShuttleTest.java |  74 ++++-
 .../plan/hints/stream/StateTtlHintTest.java        | 103 ++++++-
 .../plan/nodes/exec/serde/StateMetadataTest.java   |  18 +-
 .../exec/stream/GroupAggregateRestoreTest.java     |   3 +-
 .../exec/stream/GroupAggregateTestPrograms.java    |  29 ++
 .../ConfigureOperatorLevelStateTtlJsonITCase.java  |  30 +-
 ...teTtlHintsWithInvalidPropagationShuttleTest.xml |  92 +++++-
 .../planner/plan/hints/stream/StateTtlHintTest.xml | 329 +++++++++++++++++----
 .../plan/agg-with-state-ttl-hint.json              | 272 +++++++++++++++++
 .../agg-with-state-ttl-hint/savepoint/_metadata    | Bin 0 -> 11151 bytes
 35 files changed, 1187 insertions(+), 177 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index c8004ef1867..97a073ac999 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -470,7 +470,8 @@ public final class RelTimeIndicatorConverter extends 
RelHomogeneousShuttle {
                         tableAgg.getInput(),
                         tableAgg.getGroupSet(),
                         tableAgg.getGroupSets(),
-                        tableAgg.getAggCallList());
+                        tableAgg.getAggCallList(),
+                        Collections.emptyList());
         FlinkLogicalAggregate convertedAgg = visitAggregate(correspondingAgg);
         return new FlinkLogicalTableAggregate(
                 tableAgg.getCluster(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
index 3acbf3fa34a..006dab4b832 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.hint;
 
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
@@ -32,11 +31,11 @@ import java.util.stream.Collectors;
 public class CapitalizeQueryHintsShuttle extends QueryHintsRelShuttle {
 
     @Override
-    protected RelNode visitBiRel(BiRel biRel) {
-        Hintable hBiRel = (Hintable) biRel;
+    protected RelNode doVisit(RelNode node) {
+        Hintable hNode = (Hintable) node;
         AtomicBoolean changed = new AtomicBoolean(false);
         List<RelHint> hintsWithCapitalJoinHints =
-                hBiRel.getHints().stream()
+                hNode.getHints().stream()
                         .map(
                                 hint -> {
                                     String capitalHintName = 
hint.hintName.toUpperCase(Locale.ROOT);
@@ -69,9 +68,9 @@ public class CapitalizeQueryHintsShuttle extends 
QueryHintsRelShuttle {
                         .collect(Collectors.toList());
 
         if (changed.get()) {
-            return super.visit(hBiRel.withHints(hintsWithCapitalJoinHints));
+            return super.visit(hNode.withHints(hintsWithCapitalJoinHints));
         } else {
-            return super.visit(biRel);
+            return super.visit(node);
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
index 8f2d8406cb7..66973683907 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.planner.hint;
 
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.sql.SqlNode;
@@ -50,15 +50,15 @@ import java.util.stream.Collectors;
 public class ClearQueryHintsWithInvalidPropagationShuttle extends 
QueryHintsRelShuttle {
 
     @Override
-    protected RelNode visitBiRel(BiRel biRel) {
-        List<RelHint> hints = ((Hintable) biRel).getHints();
+    protected RelNode doVisit(RelNode node) {
+        List<RelHint> hints = ((Hintable) node).getHints();
 
         Set<String> allHintNames =
                 hints.stream().map(hint -> 
hint.hintName).collect(Collectors.toSet());
 
         // there are no query hints on this Join/Correlate node
         if (allHintNames.stream().noneMatch(FlinkHints::isQueryHint)) {
-            return super.visit(biRel);
+            return super.visit(node);
         }
 
         Optional<RelHint> firstAliasHint =
@@ -66,9 +66,9 @@ public class ClearQueryHintsWithInvalidPropagationShuttle 
extends QueryHintsRelS
                         .filter(hint -> 
FlinkHints.HINT_ALIAS.equals(hint.hintName))
                         .findFirst();
 
-        // there are no alias hints on this Join/Correlate node
+        // there are no alias hints on this Join/Correlate/Aggregate node
         if (!firstAliasHint.isPresent()) {
-            return super.visit(biRel);
+            return super.visit(node);
         }
 
         List<RelHint> queryHintsFromOuterQueryBlock =
@@ -84,10 +84,10 @@ public class ClearQueryHintsWithInvalidPropagationShuttle 
extends QueryHintsRelS
                         .collect(Collectors.toList());
 
         if (queryHintsFromOuterQueryBlock.isEmpty()) {
-            return super.visit(biRel);
+            return super.visit(node);
         }
 
-        RelNode newRelNode = biRel;
+        RelNode newRelNode = node;
         ClearOuterQueryHintShuttle clearOuterQueryHintShuttle;
 
         for (RelHint outerQueryHint : queryHintsFromOuterQueryBlock) {
@@ -128,26 +128,31 @@ public class ClearQueryHintsWithInvalidPropagationShuttle 
extends QueryHintsRelS
 
         @Override
         public RelNode visit(LogicalCorrelate correlate) {
-            return visitBiRel(correlate);
+            return doVisit(correlate);
         }
 
         @Override
         public RelNode visit(LogicalJoin join) {
-            return visitBiRel(join);
+            return doVisit(join);
         }
 
-        private RelNode visitBiRel(BiRel biRel) {
-            Hintable hBiRel = (Hintable) biRel;
-            List<RelHint> hints = new ArrayList<>(hBiRel.getHints());
+        @Override
+        public RelNode visit(LogicalAggregate aggregate) {
+            return doVisit(aggregate);
+        }
+
+        private RelNode doVisit(RelNode node) {
+            Hintable hNode = (Hintable) node;
+            List<RelHint> hints = new ArrayList<>(hNode.getHints());
             Optional<RelHint> invalidQueryHint = getInvalidQueryHint(hints);
 
             // if this node contains the query hint that needs to be removed
             if (invalidQueryHint.isPresent()) {
                 hints.remove(invalidQueryHint.get());
-                return super.visit(hBiRel.withHints(hints));
+                return super.visit(hNode.withHints(hints));
             }
 
-            return super.visit(biRel);
+            return super.visit(node);
         }
 
         /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
index 8b04002d1ab..afdda70ce41 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
@@ -122,8 +122,9 @@ public abstract class FlinkHintStrategies {
                                 .build())
                 .hintStrategy(
                         StateTtlHint.STATE_TTL.getHintName(),
-                        // TODO support agg state ttl hint
-                        HintStrategy.builder(HintPredicates.JOIN)
+                        HintStrategy.builder(
+                                        HintPredicates.or(
+                                                HintPredicates.JOIN, 
HintPredicates.AGGREGATE))
                                 
.optionChecker(STATE_TTL_NON_EMPTY_KV_OPTION_CHECKER)
                                 .build())
                 .build();
@@ -252,7 +253,7 @@ public abstract class FlinkHintStrategies {
 
                 litmus.check(
                         ttlHint.kvOptions.size() > 0,
-                        "Invalid hint about STATE_TTL, expecting at least one 
key-value options specified.");
+                        "Invalid STATE_TTL hint, expecting at least one 
key-value options specified.");
 
                 // validate the hint value
                 ttlHint.kvOptions
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index dde8a97c2bd..18f13d75472 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -52,7 +52,7 @@ public abstract class FlinkHints {
     // ~ Internal alias tag hint
     public static final String HINT_ALIAS = "ALIAS";
 
-    // ~ Option name for hints on join or correlate
+    // ~ Option name for hints on BiRel like join or correlate
     public static final String LEFT_INPUT = "LEFT";
     public static final String RIGHT_INPUT = "RIGHT";
 
@@ -267,11 +267,8 @@ public abstract class FlinkHints {
         return JoinStrategy.isJoinStrategy(hintName) || 
StateTtlHint.isStateTtlHint(hintName);
     }
 
-    /**
-     * Currently, lookup join hints and state ttl hints are KV hints. And 
regular join hints are
-     * LIST hints.
-     */
-    public static boolean isKVQueryHint(String hintName) {
-        return JoinStrategy.isLookupHint(hintName) || 
StateTtlHint.isStateTtlHint(hintName);
+    /** Check if the hint is a alias hint. */
+    public static boolean isAliasHint(String hintName) {
+        return FlinkHints.HINT_ALIAS.equalsIgnoreCase(hintName);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
index d2a0a47053d..7c02bb0b756 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.table.planner.hint;
 
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -37,15 +37,20 @@ public abstract class QueryHintsRelShuttle extends 
RelShuttleImpl {
         if (containsSubQuery(join)) {
             join = (LogicalJoin) resolveSubQuery(join, relNode -> 
relNode.accept(this));
         }
-        return visitBiRel(join);
+        return doVisit(join);
     }
 
     @Override
     public RelNode visit(LogicalCorrelate correlate) {
-        return visitBiRel(correlate);
+        return doVisit(correlate);
     }
 
-    protected abstract RelNode visitBiRel(BiRel biRel);
+    @Override
+    public RelNode visit(LogicalAggregate aggregate) {
+        return doVisit(aggregate);
+    }
+
+    protected abstract RelNode doVisit(RelNode node);
 
     @Override
     public RelNode visit(LogicalFilter filter) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
index e166ccaf98f..db1896d0d0f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
@@ -22,9 +22,12 @@ import org.apache.flink.util.TimeUtils;
 
 import org.apache.calcite.rel.hint.RelHint;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Hint strategy to configure different {@link
@@ -61,12 +64,13 @@ public enum StateTtlHint {
     }
 
     /**
-     * Get the state ttl from hints.
+     * Get the state ttl from hints on the {@link 
org.apache.calcite.rel.BiRel} such as Join and
+     * Correlate.
      *
      * @return The key of the map is the input side. The value of the map is 
the state ttl in
      *     milliseconds.
      */
-    public static Map<Integer, Long> getStateTtlFromHint(List<RelHint> hints) {
+    public static Map<Integer, Long> getStateTtlFromHintOnBiRel(List<RelHint> 
hints) {
         Map<Integer, Long> stateTtlFromHint = new java.util.HashMap<>();
         hints.stream()
                 .filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
@@ -87,4 +91,22 @@ public enum StateTtlHint {
 
         return stateTtlFromHint;
     }
+
+    /**
+     * Get the state ttl from hints on the {@link 
org.apache.calcite.rel.SingleRel} such as
+     * Aggregate.
+     *
+     * @return the state ttl in milliseconds. If no state ttl hints set from 
hint, return "null".
+     */
+    @Nullable
+    public static Long getStateTtlFromHintOnSingleRel(List<RelHint> hints) {
+        List<Long> allStateTtl =
+                hints.stream()
+                        .filter(hint -> 
StateTtlHint.isStateTtlHint(hint.hintName))
+                        .flatMap(hint -> hint.listOptions.stream())
+                        .map(ttl -> TimeUtils.parseDuration(ttl).toMillis())
+                        .collect(Collectors.toList());
+
+        return allStateTtl.isEmpty() ? null : allStateTtl.get(0);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
index a51b95446e1..d37aeb449a5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
@@ -106,10 +106,17 @@ public class StateMetadata {
 
     public static List<StateMetadata> getOneInputOperatorDefaultMeta(
             ReadableConfig tableConfig, String stateName) {
+        return getOneInputOperatorDefaultMeta(null, tableConfig, stateName);
+    }
+
+    public static List<StateMetadata> getOneInputOperatorDefaultMeta(
+            @Nullable Long stateTtlFromHint, ReadableConfig tableConfig, 
String stateName) {
         return Collections.singletonList(
                 new StateMetadata(
                         0,
-                        
tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION),
+                        stateTtlFromHint == null
+                                ? 
tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
+                                : Duration.ofMillis(stateTtlFromHint),
                         stateName));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 37809b9cc92..95f036d06e5 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -133,6 +133,7 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
             boolean generateUpdateBefore,
             boolean needRetraction,
             @Nullable Integer indexOfCountStar,
+            @Nullable Long stateTtlFromHint,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -148,7 +149,8 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
                 generateUpdateBefore,
                 needRetraction,
                 indexOfCountStar,
-                StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, 
STATE_NAME),
+                StateMetadata.getOneInputOperatorDefaultMeta(
+                        stateTtlFromHint, tableConfig, STATE_NAME),
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index cdcb725aca9..cb5a25ed913 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -120,6 +120,7 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
             boolean[] aggCallNeedRetractions,
             boolean generateUpdateBefore,
             boolean needRetraction,
+            @Nullable Long stateTtlFromHint,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -132,7 +133,8 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
                 aggCallNeedRetractions,
                 generateUpdateBefore,
                 needRetraction,
-                StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, 
STATE_NAME),
+                StateMetadata.getOneInputOperatorDefaultMeta(
+                        stateTtlFromHint, tableConfig, STATE_NAME),
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 36d20f764d4..fb7d767536b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -126,6 +126,7 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
             boolean[] partialAggCallNeedRetractions,
             RowType partialLocalAggInputType,
             boolean partialAggNeedRetraction,
+            @Nullable Long stateTtlFromHint,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -140,7 +141,8 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
                 partialAggCallNeedRetractions,
                 partialLocalAggInputType,
                 partialAggNeedRetraction,
-                StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, 
STATE_NAME),
+                StateMetadata.getOneInputOperatorDefaultMeta(
+                        stateTtlFromHint, tableConfig, STATE_NAME),
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
index b3b08fd1fef..8e08933db12 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.optimize;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.hint.JoinStrategy;
@@ -27,6 +28,7 @@ import org.apache.flink.table.planner.hint.StateTtlHint;
 
 import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.hint.RelHint;
@@ -83,21 +85,50 @@ public class QueryHintsResolver extends 
QueryHintsRelShuttle {
     }
 
     @Override
-    protected RelNode visitBiRel(BiRel biRel) {
-        Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
-        Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
-
-        Set<RelHint> existentKVHints = new HashSet<>();
-
-        List<RelHint> oldHints = ((Hintable) biRel).getHints();
+    protected RelNode doVisit(RelNode node) {
+        List<RelHint> oldHints = ((Hintable) node).getHints();
         List<RelHint> oldQueryHints = FlinkHints.getAllQueryHints(oldHints);
         // has no hints, return directly.
         if (oldQueryHints.isEmpty()) {
-            return super.visitChildren(biRel);
+            return super.visitChildren(node);
         }
 
-        List<RelHint> newHints = new ArrayList<>();
+        final List<RelHint> newHints;
+        if (node instanceof BiRel) {
+            BiRel biRel = (BiRel) node;
+            Optional<String> leftName = 
extractAliasOrTableName(biRel.getLeft());
+            Optional<String> rightName = 
extractAliasOrTableName(biRel.getRight());
+            newHints = validateAndGetNewHints(leftName, rightName, oldHints);
+        } else if (node instanceof SingleRel) {
+            SingleRel singleRel = (SingleRel) node;
+            Optional<String> tableName = 
extractAliasOrTableName(singleRel.getInput());
+            newHints = validateAndGetNewHints(tableName, oldHints);
+        } else {
+            throw new TableException(
+                    String.format(
+                            "Unsupported node when resolving query hints: %s",
+                            node.getClass().getCanonicalName()));
+        }
+
+        RelNode newNode = super.visitChildren(node);
+        List<RelHint> mergedHints = mergeQueryHintsIfNecessary(newHints);
+        // replace new query hints
+        return ((Hintable) newNode).withHints(mergedHints);
+    }
+
+    /**
+     * Resolve the query hints in the {@link BiRel} such as {@link
+     * org.apache.calcite.rel.core.Correlate} and {@link 
org.apache.calcite.rel.core.Join}.
+     *
+     * @param leftName left table name, view name or alias name
+     * @param rightName right table name, view name or alias name
+     * @param oldHints old hints in this node
+     */
+    private List<RelHint> validateAndGetNewHints(
+            Optional<String> leftName, Optional<String> rightName, 
List<RelHint> oldHints) {
+        Set<RelHint> existentKVHints = new HashSet<>();
 
+        List<RelHint> newHints = new ArrayList<>();
         for (RelHint hint : oldHints) {
             if (JoinStrategy.isLookupHint(hint.hintName)) {
                 allHints.add(trimInheritPath(hint));
@@ -156,12 +187,43 @@ public class QueryHintsResolver extends 
QueryHintsRelShuttle {
                 }
             }
         }
+        return newHints;
+    }
 
-        RelNode newNode = super.visitChildren(biRel);
+    /**
+     * Resolve the query hints in the {@link SingleRel} such as {@link
+     * org.apache.calcite.rel.core.Aggregate}.
+     *
+     * @param inputName the input table name, view name or alias name
+     * @param oldHints old hints in this node
+     */
+    private List<RelHint> validateAndGetNewHints(
+            Optional<String> inputName, List<RelHint> oldHints) {
+        Set<RelHint> existentKVHints = new HashSet<>();
 
-        newHints = mergeQueryHintsIfNecessary(newHints);
-        // replace new query hints
-        return ((Hintable) newNode).withHints(newHints);
+        List<RelHint> newHints = new ArrayList<>();
+
+        for (RelHint hint : oldHints) {
+            if (StateTtlHint.isStateTtlHint(hint.hintName)) {
+                List<String> definedTables = new 
ArrayList<>(hint.kvOptions.keySet());
+                initOptionInfoAboutQueryHintsForCheck(hint.hintName, 
definedTables);
+                // the kv options will be converted to list options
+                List<String> newListOptions =
+                        getNewStateTtlHintOptions(inputName, hint.kvOptions, 
hint.hintName);
+                if (!newListOptions.isEmpty()) {
+                    // only accept a matched hint
+                    validHints.add(trimInheritPath(hint));
+                    newHints.add(
+                            
RelHint.builder(hint.hintName).hintOptions(newListOptions).build());
+                }
+            } else {
+                if (!existentKVHints.contains(hint)) {
+                    existentKVHints.add(hint);
+                    newHints.add(hint);
+                }
+            }
+        }
+        return newHints;
     }
 
     private List<String> getNewJoinHintOptions(
@@ -228,6 +290,19 @@ public class QueryHintsResolver extends 
QueryHintsRelShuttle {
         return newOptions;
     }
 
+    /** The state ttl hint for {@link SingleRel} will be converted to a list 
option. */
+    private List<String> getNewStateTtlHintOptions(
+            Optional<String> inputName, Map<String, String> kvOptions, String 
hintName) {
+        updateInfoForOptionCheck(hintName, inputName);
+        return kvOptions.entrySet().stream()
+                .filter(
+                        entry ->
+                                inputName.isPresent()
+                                        && matchIdentifier(entry.getKey(), 
inputName.get()))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+    }
+
     private void validateHints() {
         Set<RelHint> invalidHints = new HashSet<>(allHints);
         invalidHints.removeAll(validHints);
@@ -389,32 +464,53 @@ public class QueryHintsResolver extends 
QueryHintsRelShuttle {
     private List<RelHint> mergeQueryHintsIfNecessary(List<RelHint> hints) {
         List<RelHint> result = new ArrayList<>();
         Map<String, Map<String, String>> kvHintsMap = new HashMap<>();
+        Map<String, String> listHintsMap = new HashMap<>();
 
         for (RelHint hint : hints) {
             String hintName = hint.hintName;
 
-            // if the hint is not KV hint, add it directly
-            if (!FlinkHints.isKVQueryHint(hintName)) {
+            // if the hint is a join hint or alias hint, add it directly
+            if (JoinStrategy.isJoinStrategy(hintName) || 
FlinkHints.isAliasHint(hintName)) {
                 result.add(hint);
                 continue;
             }
 
-            // if the hint is KV hint, merge it with the existing hints
-            Map<String, String> kvOptions = new HashMap<>(hint.kvOptions);
-            if (kvHintsMap.containsKey(hintName)) {
-                Map<String, String> existingOptions = kvHintsMap.get(hintName);
-                for (String key : kvOptions.keySet()) {
-                    // if the key is same, choose the first hint to take effect
-                    existingOptions.computeIfAbsent(key, k -> 
kvOptions.get(key));
+            if (!hint.kvOptions.isEmpty()) {
+                // if the hint is KV hint like lookup hint and state ttl hint 
on BiRel, merge it
+                // with the existing hints
+                Map<String, String> kvOptions = new HashMap<>(hint.kvOptions);
+                if (kvHintsMap.containsKey(hintName)) {
+                    Map<String, String> existingOptions = 
kvHintsMap.get(hintName);
+                    for (String key : kvOptions.keySet()) {
+                        // if the key is same, choose the first hint to take 
effect
+                        existingOptions.computeIfAbsent(key, k -> 
kvOptions.get(key));
+                    }
+                } else {
+                    kvHintsMap.put(hintName, kvOptions);
                 }
+            } else if (!hint.listOptions.isEmpty()) {
+                // if the hint is LIST hint like state ttl hint on SingleRel, 
choose the first hint
+                // to take effect
+                listHintsMap.computeIfAbsent(hintName, k -> 
hint.listOptions.get(0));
             } else {
-                kvHintsMap.put(hintName, kvOptions);
+                // throw an exception again although empty options may be 
checked by different
+                // checkers in FlinkHintStrategies before
+                throw new ValidationException(
+                        String.format(
+                                "Invalid %s hint, the key-value options and 
list options are all empty",
+                                hintName));
             }
         }
 
         for (String kvHintName : kvHintsMap.keySet()) {
             
result.add(RelHint.builder(kvHintName).hintOptions(kvHintsMap.get(kvHintName)).build());
         }
+        for (String listHintName : listHintsMap.keySet()) {
+            result.add(
+                    RelHint.builder(listHintName)
+                            
.hintOptions(Collections.singletonList(listHintsMap.get(listHintName)))
+                            .build());
+        }
         return result;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
index 652a177bb98..21a8476b77b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
@@ -133,7 +133,7 @@ object FlinkLogicalRelFactories {
         groupSet: ImmutableBitSet,
         groupSets: ImmutableList[ImmutableBitSet],
         aggCalls: util.List[AggregateCall]): RelNode = {
-      FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls)
+      FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls, hints)
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 0c9c1f2ca03..1abd1dc59f0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.plan.nodes.logical
 
+import org.apache.flink.table.planner.JList
 import org.apache.flink.table.planner.plan.PartialFinalType
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.utils.AggregateUtil
@@ -48,15 +49,9 @@ class FlinkLogicalAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     /* flag indicating whether to skip SplitAggregateRule */
-    var partialFinalType: PartialFinalType = PartialFinalType.NONE)
-  extends Aggregate(
-    cluster,
-    traitSet,
-    Collections.emptyList[RelHint](),
-    child,
-    groupSet,
-    groupSets,
-    aggCalls)
+    var partialFinalType: PartialFinalType = PartialFinalType.NONE,
+    hints: JList[RelHint] = Collections.emptyList())
+  extends Aggregate(cluster, traitSet, hints, child, groupSet, groupSets, 
aggCalls)
   with FlinkLogicalRel {
 
   def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -76,7 +71,9 @@ class FlinkLogicalAggregate(
       groupSet,
       groupSets,
       aggCalls,
-      partialFinalType)
+      partialFinalType,
+      getHints
+    )
   }
 
   override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
@@ -118,7 +115,12 @@ private class FlinkLogicalAggregateBatchConverter(config: 
ConverterRule.Config)
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalAggregate]
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
-    FlinkLogicalAggregate.create(newInput, agg.getGroupSet, agg.getGroupSets, 
agg.getAggCallList)
+    FlinkLogicalAggregate.create(
+      newInput,
+      agg.getGroupSet,
+      agg.getGroupSets,
+      agg.getAggCallList,
+      agg.getHints)
   }
 }
 
@@ -139,7 +141,12 @@ private class FlinkLogicalAggregateStreamConverter(config: 
ConverterRule.Config)
   override def convert(rel: RelNode): RelNode = {
     val agg = rel.asInstanceOf[LogicalAggregate]
     val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
-    FlinkLogicalAggregate.create(newInput, agg.getGroupSet, agg.getGroupSets, 
agg.getAggCallList)
+    FlinkLogicalAggregate.create(
+      newInput,
+      agg.getGroupSet,
+      agg.getGroupSets,
+      agg.getAggCallList,
+      agg.getHints)
   }
 }
 
@@ -168,9 +175,19 @@ object FlinkLogicalAggregate {
       input: RelNode,
       groupSet: ImmutableBitSet,
       groupSets: util.List[ImmutableBitSet],
-      aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
+      aggCalls: util.List[AggregateCall],
+      hints: JList[RelHint]): FlinkLogicalAggregate = {
     val cluster = input.getCluster
     val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
-    new FlinkLogicalAggregate(cluster, traitSet, input, groupSet, groupSets, 
aggCalls)
+    new FlinkLogicalAggregate(
+      cluster,
+      traitSet,
+      input,
+      groupSet,
+      groupSets,
+      aggCalls,
+      PartialFinalType.NONE,
+      hints
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
index d98eb3c8435..0ae4f46b891 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.planner.JList
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
 import org.apache.flink.table.planner.plan.PartialFinalType
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate
@@ -28,6 +30,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
+
+import java.util.Collections
 
 /**
  * Stream physical RelNode for unbounded global group aggregate.
@@ -46,8 +51,9 @@ class StreamPhysicalGlobalGroupAggregate(
     val localAggInputRowType: RelDataType,
     val needRetraction: Boolean,
     val partialFinalType: PartialFinalType,
-    indexOfCountStar: Option[Int] = Option.empty)
-  extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel, 
grouping, aggCalls) {
+    indexOfCountStar: Option[Int] = Option.empty,
+    hints: JList[RelHint] = Collections.emptyList())
+  extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel, 
grouping, aggCalls, hints) {
 
   // if the indexOfCountStar is valid, the needRetraction should be true
   require(indexOfCountStar.isEmpty || indexOfCountStar.get >= 0 && 
needRetraction)
@@ -90,7 +96,8 @@ class StreamPhysicalGlobalGroupAggregate(
       localAggInputRowType,
       needRetraction,
       partialFinalType,
-      indexOfCountStar)
+      indexOfCountStar,
+      hints)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate(
       generateUpdateBefore,
       needRetraction,
       indexOfCountStar.map(Integer.valueOf).orNull,
+      StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
index e5c73784f3a..cab8c770d66 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.planner.JList
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
 import org.apache.flink.table.planner.plan.PartialFinalType
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate
@@ -28,8 +30,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
 
 import java.util
+import java.util.Collections
 
 /**
  * Stream physical RelNode for unbounded group aggregate.
@@ -46,8 +50,9 @@ class StreamPhysicalGroupAggregate(
     outputRowType: RelDataType,
     grouping: Array[Int],
     aggCalls: Seq[AggregateCall],
-    var partialFinalType: PartialFinalType = PartialFinalType.NONE)
-  extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel, 
grouping, aggCalls) {
+    var partialFinalType: PartialFinalType = PartialFinalType.NONE,
+    hints: JList[RelHint] = Collections.emptyList())
+  extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel, 
grouping, aggCalls, hints) {
 
   private val aggInfoList =
     AggregateUtil.deriveAggregateInfoList(this, grouping.length, aggCalls)
@@ -64,7 +69,8 @@ class StreamPhysicalGroupAggregate(
       outputRowType,
       grouping,
       aggCalls,
-      partialFinalType)
+      partialFinalType,
+      hints)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -91,6 +97,7 @@ class StreamPhysicalGroupAggregate(
       aggCallNeedRetractions,
       generateUpdateBefore,
       needRetraction,
+      StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
index 1bea3f0b01d..ca0315231ca 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
@@ -17,9 +17,14 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.planner.JList
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{RelNode, SingleRel}
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.hint.RelHint
+
+import java.util.Collections
 
 /**
  * Base stream physical RelNode for unbounded group aggregate.
@@ -44,6 +49,7 @@ abstract class StreamPhysicalGroupAggregateBase(
     traitSet: RelTraitSet,
     inputRel: RelNode,
     val grouping: Array[Int],
-    val aggCalls: Seq[AggregateCall])
+    val aggCalls: Seq[AggregateCall],
+    val hints: JList[RelHint] = Collections.emptyList())
   extends SingleRel(cluster, traitSet, inputRel)
   with StreamPhysicalRel {}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
index cd4377760bb..718e3a76ca0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
@@ -17,7 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
+import org.apache.flink.table.planner.JList
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate
 import org.apache.flink.table.planner.plan.utils._
@@ -27,8 +29,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
 
 import java.util
+import java.util.Collections
 
 /**
  * Stream physical RelNode for unbounded incremental group aggregate.
@@ -68,13 +72,15 @@ class StreamPhysicalIncrementalGroupAggregate(
     partialAggCallNeedRetractions: Array[Boolean],
     partialAggNeedRetraction: Boolean,
     partialLocalAggInputRowType: RelDataType,
-    partialGlobalAggRowType: RelDataType)
+    partialGlobalAggRowType: RelDataType,
+    hints: JList[RelHint] = Collections.emptyList())
   extends StreamPhysicalGroupAggregateBase(
     cluster,
     traitSet,
     inputRel,
     finalAggGrouping,
-    finalAggCalls) {
+    finalAggCalls,
+    hints) {
 
   private lazy val incrementalAggInfo = 
AggregateUtil.createIncrementalAggInfoList(
     unwrapTypeFactory(inputRel),
@@ -107,7 +113,8 @@ class StreamPhysicalIncrementalGroupAggregate(
       partialAggCallNeedRetractions,
       partialAggNeedRetraction,
       partialLocalAggInputRowType,
-      partialGlobalAggRowType)
+      partialGlobalAggRowType,
+      hints)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -137,6 +144,7 @@ class StreamPhysicalIncrementalGroupAggregate(
       partialAggCallNeedRetractions,
       FlinkTypeFactory.toLogicalRowType(partialLocalAggInputRowType),
       partialAggNeedRetraction,
+      StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index 9de16707e59..77774cd02a8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -128,7 +128,7 @@ class StreamPhysicalJoin(
       getUpsertKeys(right, joinSpec.getRightKeys),
       InputProperty.DEFAULT,
       InputProperty.DEFAULT,
-      StateTtlHint.getStateTtlFromHint(getHints),
+      StateTtlHint.getStateTtlFromHintOnBiRel(getHints),
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index 61827342614..1e8caa75d17 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -314,7 +314,8 @@ class SplitAggregateRule
       relBuilder.build(),
       fullGroupSet,
       ImmutableList.of[ImmutableBitSet](fullGroupSet),
-      newPartialAggCalls)
+      newPartialAggCalls,
+      originalAggregate.getHints)
     partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL)
     relBuilder.push(partialAggregate)
 
@@ -357,7 +358,8 @@ class SplitAggregateRule
       relBuilder.build(),
       SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
       SplitAggregateRule.remap(fullGroupSet, 
Seq(originalAggregate.getGroupSet)),
-      finalAggCalls
+      finalAggCalls,
+      originalAggregate.getHints
     )
     finalAggregate.setPartialFinalType(PartialFinalType.FINAL)
     relBuilder.push(finalAggregate)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
index d04f0b9bbf3..2fb63c0bcb0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
@@ -93,7 +93,9 @@ class IncrementalAggregateRule
       partialGlobalAgg.aggCallNeedRetractions,
       partialGlobalAgg.needRetraction,
       partialLocalAggInputRowType,
-      partialGlobalAgg.getRowType)
+      partialGlobalAgg.getRowType,
+      partialGlobalAgg.hints
+    )
     val incrAggOutputRowType = incrAgg.getRowType
 
     val newExchange = exchange.copy(exchange.getTraitSet, incrAgg, 
exchange.distribution)
@@ -142,7 +144,8 @@ class IncrementalAggregateRule
         finalGlobalAgg.localAggInputRowType,
         partialGlobalAgg.needRetraction,
         finalGlobalAgg.partialFinalType,
-        partialGlobalAgg.globalAggInfoList.indexOfCountStar)
+        partialGlobalAgg.globalAggInfoList.indexOfCountStar,
+        finalGlobalAgg.hints)
     }
 
     call.transformTo(globalAgg)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index 92f1b76693e..3495ebea06e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -73,7 +73,8 @@ class StreamPhysicalGroupAggregateRule(config: Config) 
extends ConverterRule(con
       rel.getRowType,
       agg.getGroupSet.toArray,
       agg.getAggCallList,
-      agg.partialFinalType)
+      agg.partialFinalType,
+      agg.getHints)
   }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
index 272aeb8dfbb..30a792f8383 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
@@ -137,7 +137,9 @@ class TwoStageOptimizedAggregateRule
       aggCallNeedRetractions,
       realInput.getRowType,
       needRetraction,
-      originalAgg.partialFinalType)
+      originalAgg.partialFinalType,
+      Option.empty,
+      originalAgg.hints)
 
     call.transformTo(globalAgg)
   }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
index 62580a63b33..f767be6a8f7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
@@ -21,10 +21,10 @@ import 
org.apache.flink.table.planner.analyze.{FlinkStreamPlanAnalyzers, PlanAdv
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
+import org.apache.flink.table.planner.plan.nodes.physical.stream._
 
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Correlate, Join, TableScan}
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Join, TableScan}
 import org.apache.calcite.rel.externalize.RelWriterImpl
 import org.apache.calcite.rel.hint.Hintable
 import org.apache.calcite.sql.SqlExplainLevel
@@ -150,6 +150,16 @@ class RelTreeWriterImpl(
             printValues.add(Pair.of("stateTtlHints", 
RelExplainUtil.hintsToString(stateTtlHints)))
           }
 
+        case _: Aggregate | _: StreamPhysicalGroupAggregateBase =>
+          val aggHints =
+            rel match {
+              case aggregate: Aggregate => aggregate.getHints
+              case _ => 
rel.asInstanceOf[StreamPhysicalGroupAggregateBase].hints
+            }
+          val stateTtlHints = FlinkHints.getAllStateTtlHints(aggHints)
+          if (stateTtlHints.nonEmpty) {
+            printValues.add(Pair.of("stateTtlHints", 
RelExplainUtil.hintsToString(stateTtlHints)))
+          }
         case _ => // ignore
       }
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
index 4fd658cf97a..af3d9bbdde1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
@@ -112,6 +112,19 @@ abstract class 
ClearQueryHintsWithInvalidPropagationShuttleTestBase extends Tabl
                                 + isBatchMode()
                                 + "'\n"
                                 + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE t4 (\n"
+                                + "  a BIGINT,\n"
+                                + "  b BIGINT,\n"
+                                + "  c BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'bounded' = '"
+                                + isBatchMode()
+                                + "'\n"
+                                + ")");
     }
 
     protected String buildRelPlanWithQueryBlockAlias(RelNode node) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
index c052338bcdf..6bd57e3d16b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.table.planner.utils.TableTestUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -51,7 +53,7 @@ public class 
ClearStateTtlHintsWithInvalidPropagationShuttleTest
     }
 
     @Test
-    void testNoNeedToClearStateTtlHint() {
+    void testNoNeedToClearStateTtlHintOnJoin() {
         //  SELECT t4.a FROM (
         //      SELECT /*+ StaTe_TtL("t1" = "1d", "t2" = "7d")*/t1.a FROM t1 
JOIN t2 ON t1.a = t2.a
         //  ) t4 JOIN t3 ON t4.a = t3.a
@@ -82,7 +84,7 @@ public class 
ClearStateTtlHintsWithInvalidPropagationShuttleTest
     }
 
     @Test
-    void testClearStateTtlHint() {
+    void testClearStateTtlHintOnJoin() {
         //  SELECT /*+ StaTe_TtL("t4" = "9d", "t3" = "12d")*/t4.a FROM (
         //      SELECT /*+ StaTe_TtL("t1" = "1d", "t2" = "7d")*/t1.a FROM t1 
JOIN t2 ON t1.a = t2.a
         //  ) t4 JOIN t3 ON t4.a = t3.a
@@ -119,4 +121,72 @@ public class 
ClearStateTtlHintsWithInvalidPropagationShuttleTest
 
         verifyRelPlan(root);
     }
+
+    @Test
+    void testNoNeedToClearStateTtlHintOnAggregate() {
+        //  SELECT tmp.a, max(tmp.cnt) as max FROM (
+        //      SELECT /*+ StaTe_TtL("t4" = "1d")*/t4.a, t4.b, count(t4.c) as 
cnt
+        //          FROM t4 GROUP BY t4.a, t4.b
+        //  ) tmp GROUP BY tmp.a
+        Map<String, String> hintOptions = new HashMap<>();
+        hintOptions.put("t4", "1d");
+
+        RelHint stateTtlHint = 
RelHint.builder("StaTe_TtL").hintOptions(hintOptions).build();
+
+        RelHint aliasHint = 
RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("tmp").build();
+        RelNode root =
+                builder.scan("t4")
+                        .aggregate(
+                                builder.groupKey(ImmutableBitSet.of(0, 1)),
+                                Collections.singletonList(
+                                        builder.count(builder.field(1, 0, 
"c")).as("cnt")))
+                        .hints(stateTtlHint, aliasHint)
+                        .project(builder.field(1, 0, "a"), builder.field(1, 0, 
"cnt"))
+                        .aggregate(
+                                builder.groupKey(ImmutableBitSet.of(0)),
+                                Collections.singletonList(
+                                        builder.max(builder.field(1, 0, 
"cnt")).as("max")))
+                        .project(builder.field(1, 0, "a"), builder.field(1, 0, 
"max"))
+                        .build();
+
+        verifyRelPlan(root);
+    }
+
+    @Test
+    void testClearStateTtlHintOnAggregate() {
+        //  SELECT /*+ StaTe_TtL("tmp" = "2d")*/tmp.a, max(tmp.cnt) as max 
FROM (
+        //      SELECT /*+ StaTe_TtL("t4" = "1d")*/t4.a, t4.b, count(t4.c) as 
cnt
+        //          FROM t4 GROUP BY t4.a, t4.b
+        //  ) tmp GROUP BY tmp.a
+        Map<String, String> hintOptionsInner = new HashMap<>();
+        hintOptionsInner.put("t4", "1d");
+
+        RelHint stateTtlHintInner =
+                
RelHint.builder("StaTe_TtL").hintOptions(hintOptionsInner).build();
+
+        Map<String, String> hintOptionsOuter = new HashMap<>();
+        hintOptionsOuter.put("tmp", "2d");
+
+        RelHint stateTtlHintOuter =
+                
RelHint.builder("StaTe_TtL").hintOptions(hintOptionsOuter).build();
+
+        RelHint aliasHint = 
RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("tmp").build();
+        RelNode root =
+                builder.scan("t4")
+                        .aggregate(
+                                builder.groupKey(ImmutableBitSet.of(0, 1)),
+                                Collections.singletonList(
+                                        builder.count(builder.field(1, 0, 
"c")).as("cnt")))
+                        .hints(stateTtlHintInner, aliasHint)
+                        .project(builder.field(1, 0, "a"), builder.field(1, 0, 
"cnt"))
+                        .aggregate(
+                                builder.groupKey(ImmutableBitSet.of(0)),
+                                Collections.singletonList(
+                                        builder.max(builder.field(1, 0, 
"cnt")).as("max")))
+                        .project(builder.field(1, 0, "a"), builder.field(1, 0, 
"max"))
+                        .hints(stateTtlHintOuter)
+                        .build();
+
+        verifyRelPlan(root);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
index 36c30bd740d..674fdc602ba 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
@@ -70,6 +70,10 @@ class StateTtlHintTest extends TableTestBase {
 
         util.tableEnv()
                 .executeSql("create view V5 as select T1.* from T1 join T2 on 
T1.a1 = T2.a2");
+
+        util.tableEnv()
+                .executeSql(
+                        "create view V6 as select a1, b1, count(*) as cnt from 
T1 group by a1, b1");
     }
 
     @Test
@@ -193,7 +197,104 @@ class StateTtlHintTest extends TableTestBase {
         assertThatThrownBy(() -> verify(sql))
                 .isInstanceOf(AssertionError.class)
                 .hasMessageContaining(
-                        "Invalid hint about STATE_TTL, expecting at least one 
key-value options specified.");
+                        "Invalid STATE_TTL hint, expecting at least one 
key-value options specified.");
+    }
+
+    @Test
+    void testSimpleAggStateTtl() {
+        String sql = "select /*+ STATE_TTL('T1' = '2d') */ count(*) from T1 
group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithUnknownTable() {
+        String sql = "select /*+ STATE_TTL('T2' = '2d') */ count(*) from T1 
group by a1";
+        assertThatThrownBy(() -> verify(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+                        "T2", "STATE_TTL");
+    }
+
+    @Test
+    void testAggStateTtlWithView() {
+        String sql = "select /*+ STATE_TTL('V6' = '2d') */ max(b1) from V6 
group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithUnknownView() {
+        String sql =
+                "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+                        + "(select a1, b1, count(*) from T1 group by a1, b1) 
TMP group by a1";
+        assertThatThrownBy(() -> verify(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The options of following hints cannot match the name 
of input tables or views: \n`%s` in `%s`",
+                        "T1", "STATE_TTL");
+    }
+
+    @Test
+    void testMultiAggStateTtl() {
+        String sql =
+                "select /*+ STATE_TTL('T1' = '2d'), STATE_TTL('T1' = '8d') */ 
count(*) from T1 group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testSingleAggStateTtlWithMultiKV() {
+        String sql =
+                "select /*+ STATE_TTL('T1' = '2d', 'T1' = '8d') */ count(*) 
from T1 group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithCascadeAgg() {
+        String sql =
+                "select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from "
+                        + "(select /*+ STATE_TTL('T1' = '4d') */ a1, b1, 
count(*) from T1 group by a1, b1) TMP group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlNotPropagateOutOfView() {
+        String sql =
+                "select max(b1) from "
+                        + "(select /*+ STATE_TTL('T1' = '4d') */ a1, b1, 
count(*) from T1 group by a1, b1) TMP group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlNotPropagateIntoView() {
+        String sql =
+                "select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from "
+                        + "(select a1, b1, count(*) from T1 group by a1, b1) 
TMP group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithJoin() {
+        String sql =
+                "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+                        + "(select T1.* from T1 join T2 on T1.a1 = T2.a2) T1 
group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithJoinHint() {
+        String sql =
+                "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+                        + "(select  /*+ BROADCAST(T1) */T1.* from T1 join T2 
on T1.a1 = T2.a2) T1 group by a1";
+        verify(sql);
+    }
+
+    @Test
+    void testAggStateTtlWithEmptyKV() {
+        String sql = "select /*+ STATE_TTL() */ max(b1) from T1 group by a1";
+        assertThatThrownBy(() -> verify(sql))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining(
+                        "Invalid STATE_TTL hint, expecting at least one 
key-value options specified.");
     }
 
     private void verify(String sql) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
index 6a146cb7448..b5a9d8fdf2d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
@@ -93,11 +93,13 @@ public class StateMetadataTest {
     @ParameterizedTest
     public void testGetOneInputOperatorDefaultMeta(
             Consumer<TableConfig> configModifier,
+            @Nullable Long stateTtlFromHint,
             String expectedStateName,
             long expectedTtlMillis) {
         configModifier.accept(tableConfig);
         List<StateMetadata> stateMetadataList =
-                StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, 
expectedStateName);
+                StateMetadata.getOneInputOperatorDefaultMeta(
+                        stateTtlFromHint, tableConfig, expectedStateName);
         assertThat(stateMetadataList).hasSize(1);
         assertThat(stateMetadataList.get(0))
                 .isEqualTo(
@@ -173,12 +175,22 @@ public class StateMetadataTest {
 
     public static Stream<Arguments> provideConfigForOneInput() {
         return Stream.of(
-                Arguments.of((Consumer<TableConfig>) config -> {}, "fooState", 
0L),
+                Arguments.of((Consumer<TableConfig>) config -> {}, null, 
"fooState", 0L),
                 Arguments.of(
                         (Consumer<TableConfig>)
                                 config -> config.set(IDLE_STATE_RETENTION, 
Duration.ofMinutes(10)),
+                        null,
                         "barState",
-                        600000L));
+                        600000L),
+                Arguments.of((Consumer<TableConfig>) config -> {}, 100L, 
"bazState", 100L),
+                // state ttl from the hint gets a higher priority over 
job-level
+                // table.exec.state.ttl
+                Arguments.of(
+                        (Consumer<TableConfig>)
+                                config -> config.set(IDLE_STATE_RETENTION, 
Duration.ofMinutes(10)),
+                        200L,
+                        "quxState",
+                        200L));
     }
 
     public static Stream<Arguments> provideConfigForMultiInput() {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
index 2c5043d9f4d..300cf8ca012 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
@@ -41,6 +41,7 @@ public class GroupAggregateRestoreTest extends 
RestoreTestBase {
                 GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE,
                 GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE_MINI_BATCH,
                 GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE,
-                
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH);
+                
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH,
+                GroupAggregateTestPrograms.AGG_WITH_STATE_TTL_HINT);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
index b6a61a72144..6c43cf90959 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
@@ -371,4 +371,33 @@ public class GroupAggregateTestPrograms {
                                     .build())
                     .runSql(GROUP_BY_UDF_WITHOUT_MERGE.getRunSqlTestStep().sql)
                     .build();
+
+    static final TableTestProgram AGG_WITH_STATE_TTL_HINT =
+            TableTestProgram.of("agg-with-state-ttl-hint", "agg with state ttl 
hint")
+                    .setupTableSource(SOURCE_ONE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "b BIGINT",
+                                            "cnt BIGINT",
+                                            "avg_a DOUBLE",
+                                            "min_c VARCHAR",
+                                            "PRIMARY KEY (b) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, null, Hi]",
+                                            "+I[2, 1, 2.0, Hello]",
+                                            "+U[2, 2, 2.0, Hello]")
+                                    .consumedAfterRestore(
+                                            "+U[1, 2, null, Hi]",
+                                            "+U[2, 3, 2.0, Hello]",
+                                            "+U[2, 4, 2.0, Hello]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT /*+ 
STATE_TTL('source_t' = '4d') */"
+                                    + "b, "
+                                    + "COUNT(*) AS cnt, "
+                                    + "AVG(a) FILTER (WHERE a > 1) AS avg_a, "
+                                    + "MIN(c) AS min_c "
+                                    + "FROM source_t GROUP BY b")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
index ba5f48faaa7..0efeccb952f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
@@ -43,7 +43,8 @@ import java.util.function.Function;
 class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
 
     @Test
-    void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
+    void 
testDifferentStateTtlThroughCompiledPlanForDifferentOneInputStreamOperator()
+            throws Exception {
         innerTestDeduplicateAndGroupAggregate(
                 "INSERT INTO OrdersStats \n"
                         + "SELECT buyer, COUNT(1) AS ord_cnt, SUM(quantity) AS 
quantity_cnt, SUM(amount) AS total_amount FROM (\n"
@@ -65,7 +66,32 @@ class ConfigureOperatorLevelStateTtlJsonITCase extends 
JsonPlanTestBase {
     }
 
     @Test
-    void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception 
{
+    void 
testDifferentStateTtlThroughSqlHintForDifferentOneInputStreamOperator() throws 
Exception {
+        tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
+        tableEnv.getConfig().set("table.exec.mini-batch.size", "2");
+        tableEnv.getConfig().set("table.exec.mini-batch.allow-latency", "1");
+        innerTestDeduplicateAndGroupAggregate(
+                "INSERT INTO OrdersStats \n"
+                        + "SELECT /*+STATE_TTL('tmp' = '9s')*/ buyer, COUNT(1) 
AS ord_cnt, SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount \n"
+                        + "FROM (\n"
+                        + "    SELECT *, ROW_NUMBER() OVER(PARTITION BY 
order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders\n"
+                        + ") tmp\n"
+                        + "WHERE rk = 1\n"
+                        + "GROUP BY buyer",
+                json -> {
+                    try {
+                        JsonNode target = JsonTestUtils.readFromString(json);
+                        JsonTestUtils.setExecNodeStateMetadata(
+                                target, "stream-exec-deduplicate", 0, 6000L);
+                        return JsonTestUtils.writeToString(target);
+                    } catch (IOException e) {
+                        throw new TableException("Cannot modify compiled json 
plan.", e);
+                    }
+                });
+    }
+
+    @Test
+    void 
testDifferentStateTtlThroughCompiledPlanForSameTwoInputStreamOperator() throws 
Exception {
         innerTestRegularJoin(
                 "INSERT INTO OrdersShipInfo \n"
                         + "SELECT a.order_id, a.line_order_id, b.ship_mode 
FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
index 5d2f1a825d6..e33b11a557b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
@@ -16,7 +16,41 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testClearStateTtlHint">
+  <TestCase name="testClearStateTtlHintOnAggregate">
+    <Resource name="beforePropagatingHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[StaTe_TtL 
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterPropagatingHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[StaTe_TtL 
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[StaTe_TtL options:{t4=1d}][StaTe_TtL inheritPath:[0, 0] 
options:{tmp=2d}]]], hints=[[[ALIAS options:[tmp]]]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterCapitalizeJoinHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[STATE_TTL 
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[STATE_TTL options:{t4=1d}][STATE_TTL inheritPath:[0, 0] 
options:{tmp=2d}]]], hints=[[[ALIAS options:[tmp]]]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterClearingJoinHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[STATE_TTL 
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNoNeedToClearStateTtlHintOnJoin">
     <Resource name="beforePropagatingHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
@@ -31,9 +65,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterPropagatingHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StAte_tTl inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
-   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}][StAte_tTl 
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS 
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
    :     +- LogicalTableScan(table=[[builtin, default, t2]]), 
rowType=[RecordType(BIGINT a)]
    +- LogicalTableScan(table=[[builtin, default, t3]]), 
rowType=[RecordType(BIGINT a)]
@@ -42,9 +76,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterCapitalizeJoinHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
-   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}][STATE_TTL 
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS 
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
    :     +- LogicalTableScan(table=[[builtin, default, t2]]), 
rowType=[RecordType(BIGINT a)]
    +- LogicalTableScan(table=[[builtin, default, t3]]), 
rowType=[RecordType(BIGINT a)]
@@ -53,7 +87,7 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterClearingJoinHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
    :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
@@ -62,7 +96,7 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testNoNeedToClearStateTtlHint">
+  <TestCase name="testClearStateTtlHintOnJoin">
     <Resource name="beforePropagatingHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
@@ -77,9 +111,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterPropagatingHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StAte_tTl inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
-   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}][StAte_tTl 
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS 
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
    :     +- LogicalTableScan(table=[[builtin, default, t2]]), 
rowType=[RecordType(BIGINT a)]
    +- LogicalTableScan(table=[[builtin, default, t3]]), 
rowType=[RecordType(BIGINT a)]
@@ -88,9 +122,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterCapitalizeJoinHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
-   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
+   :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}][STATE_TTL 
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS 
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
    :     +- LogicalTableScan(table=[[builtin, default, t2]]), 
rowType=[RecordType(BIGINT a)]
    +- LogicalTableScan(table=[[builtin, default, t3]]), 
rowType=[RecordType(BIGINT a)]
@@ -99,12 +133,46 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
     <Resource name="afterClearingJoinHints">
       <![CDATA[
 LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]), 
rowType=[RecordType(BIGINT a, BIGINT a0)]
    :- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]), 
rowType=[RecordType(BIGINT a)]
    :  +- LogicalJoin(condition=[=($0, $1)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]], 
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, 
BIGINT a0)]
    :     :- LogicalTableScan(table=[[builtin, default, t1]]), 
rowType=[RecordType(BIGINT a)]
    :     +- LogicalTableScan(table=[[builtin, default, t2]]), 
rowType=[RecordType(BIGINT a)]
    +- LogicalTableScan(table=[[builtin, default, t3]]), 
rowType=[RecordType(BIGINT a)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNoNeedToClearStateTtlHintOnAggregate">
+    <Resource name="beforePropagatingHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a, 
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterPropagatingHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a, 
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterCapitalizeJoinHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a, 
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+    </Resource>
+    <Resource name="afterClearingJoinHints">
+      <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a, 
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)], 
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS 
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+      +- LogicalTableScan(table=[[builtin, default, t4]]), 
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
index 5e525037b65..b395e6002b2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
@@ -16,6 +16,187 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testAggStateTtlNotPropagateIntoView">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from (select a1, 
b1, count(*) from T1 group by a1, b1) TMP group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{TMP=2d}]]])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], hints=[[[ALIAS 
options:[TMP]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1])
+         +- Exchange(distribution=[hash[a1, b1]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAggStateTtlNotPropagateOutOfView">
+    <Resource name="sql">
+      <![CDATA[select max(b1) from (select /*+ STATE_TTL('T1' = '4d') */ a1, 
b1, count(*) from T1 group by a1, b1) TMP group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], 
stateTtlHints=[[[STATE_TTL options:{T1=4d}]]], hints=[[[ALIAS options:[TMP]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0])
+   +- Exchange(distribution=[hash[a1]])
+      +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1], 
stateTtlHints=[[[STATE_TTL options:[4d]]]])
+         +- Exchange(distribution=[hash[a1, b1]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAggStateTtlWithCascadeAgg">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from (select /*+ 
STATE_TTL('T1' = '4d') */ a1, b1, count(*) from T1 group by a1, b1) TMP group 
by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{TMP=2d}]]])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], 
stateTtlHints=[[[STATE_TTL options:{T1=4d}]]], hints=[[[ALIAS options:[TMP]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1], 
stateTtlHints=[[[STATE_TTL options:[4d]]]])
+         +- Exchange(distribution=[hash[a1, b1]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAggStateTtlWithJoin">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ max(b1) from (select T1.* 
from T1 join T2 on T1.a1 = T2.a2) T1 group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{T1=2d}]]])
+   +- LogicalProject(a1=[$0], b1=[$1], hints=[[[ALIAS options:[T1]]]])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], hints=[[[ALIAS 
inheritPath:[0] options:[T1]]]])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- Calc(select=[a1, b1])
+         +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a1]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+            +- Exchange(distribution=[hash[a2]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAggStateTtlWithJoinHint">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ max(b1) from (select  /*+ 
BROADCAST(T1) */T1.* from T1 join T2 on T1.a1 = T2.a2) T1 group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{T1=2d}]]])
+   +- LogicalProject(a1=[$0], b1=[$1], hints=[[[ALIAS options:[T1]]]])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]], hints=[[[ALIAS 
inheritPath:[0] options:[T1]]]])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- Calc(select=[a1, b1])
+         +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
joinHints=[[[BROADCAST options:[LEFT]]]])
+            :- Exchange(distribution=[hash[a1]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+            +- Exchange(distribution=[hash[a2]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testAggStateTtlWithView">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('V6' = '2d') */ max(b1) from V6 group by 
a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{V6=2d}]]])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()], hints=[[[ALIAS 
options:[V6]]]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1])
+         +- Exchange(distribution=[hash[a1, b1]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDuplicateJoinStateTtlHint">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d'), STATE_TTL('T1' 
= '1d', 'T2' = '8d') */* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = 
T3.b3]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($3, $5))])
+   +- LogicalJoin(condition=[true], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T2=2d, T3=3d}][STATE_TTL 
inheritPath:[0, 0] options:{T1=1d, T2=8d}]]])
+      :- LogicalJoin(condition=[true], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0, 0] options:{T2=2d, 
T3=3d}][STATE_TTL inheritPath:[0, 0, 0] options:{T1=1d, T2=8d}]]])
+      :  :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{RIGHT=3d}]]])
+:- Exchange(distribution=[hash[b2]])
+:  +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=2d}]]])
+:     :- Exchange(distribution=[hash[a1]])
+:     :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
+:     +- Exchange(distribution=[hash[a2]])
+:        +- TableSourceScan(table=[[default_catalog, default_database, T2]], 
fields=[a2, b2])
++- Exchange(distribution=[hash[b3]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T3]], 
fields=[a3, b3])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJoinStateTtlHintNotPropagateIntoView">
     <Resource name="sql">
       <![CDATA[select /*+ STATE_TTL('T1' = '1d')*/T1.* from T1 join V5 on 
T1.a1 = V5.a1]]>
@@ -108,7 +289,7 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, 
b1, a2, b2], leftInput
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], stateTtlHints=[[[STATE_TTL 
options:{T1=1d, T2=2d}]]])
 +- LogicalProject(b1=[$1], a1=[$0])
    +- LogicalJoin(condition=[=($1, $3)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
       :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
@@ -125,32 +306,6 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS 
EXPR$1])
          :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
          +- Exchange(distribution=[hash[b2]])
             +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[b2], metadata=[]]], fields=[b2])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testStateTtlHintWithJoinHint">
-    <Resource name="sql">
-      <![CDATA[select /*+ STATE_TTL('T1' = '1d', 'T2' = '2d'), BROADCAST(T1) 
*/T1.b1, sum(T1.a1) from T1 join T2 on T1.b1 = T2.b2 group by T1.b1]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
-+- LogicalProject(b1=[$1], a1=[$0])
-   +- LogicalJoin(condition=[=($1, $3)], joinType=[inner], 
joinHints=[[[BROADCAST inheritPath:[0, 0] options:[T1]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
-      :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1])
-+- Exchange(distribution=[hash[b1]])
-   +- Calc(select=[b1, a1])
-      +- Join(joinType=[InnerJoin], where=[=(b1, b2)], select=[a1, b1, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
joinHints=[[[BROADCAST options:[LEFT]]]], stateTtlHints=[[[STATE_TTL 
options:{LEFT=1d, RIGHT=2d}]]])
-         :- Exchange(distribution=[hash[b1]])
-         :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
-         +- Exchange(distribution=[hash[b2]])
-            +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[b2], metadata=[]]], fields=[b2])
 ]]>
     </Resource>
   </TestCase>
@@ -202,6 +357,71 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, 
b1, a2, b2], leftInput
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], 
fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinStateTtlHintWithView">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'V4' = '1d') */* from T1 join 
V4 on T1.a1 = V4.a4]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a4=[$2], b4=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{T1=2d, V4=1d}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalProject(a4=[$0], b4=[$1], hints=[[[ALIAS options:[V4]]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, a3, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=2d, RIGHT=1d}]]])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
++- Exchange(distribution=[hash[a3]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T3]], 
fields=[a3, b3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiAggStateTtl">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d'), STATE_TTL('T1' = '8d') */ 
count(*) from T1 group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{T1=2d}][STATE_TTL inheritPath:[0] options:{T1=8d}]]])
+   +- LogicalProject(a1=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T1, 
project=[a1], metadata=[]]], fields=[a1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSimpleAggStateTtl">
+    <Resource name="sql">
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ count(*) from T1 group by 
a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{T1=2d}]]])
+   +- LogicalProject(a1=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T1, 
project=[a1], metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
@@ -227,55 +447,50 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, 
b1, a2, b2], leftInput
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinStateTtlHintWithView">
+  <TestCase name="testSingleAggStateTtlWithMultiKV">
     <Resource name="sql">
-      <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'V4' = '1d') */* from T1 join 
V4 on T1.a1 = V4.a4]]>
+      <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'T1' = '8d') */ count(*) from 
T1 group by a1]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a4=[$2], b4=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{T1=2d, V4=1d}]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-   +- LogicalProject(a4=[$0], b4=[$1], hints=[[[ALIAS options:[V4]]]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL 
inheritPath:[0] options:{T1=8d}]]])
+   +- LogicalProject(a1=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, a3, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=2d, RIGHT=1d}]]])
-:- Exchange(distribution=[hash[a1]])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
-+- Exchange(distribution=[hash[a3]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T3]], 
fields=[a3, b3])
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0], 
stateTtlHints=[[[STATE_TTL options:[8d]]]])
+   +- Exchange(distribution=[hash[a1]])
+      +- TableSourceScan(table=[[default_catalog, default_database, T1, 
project=[a1], metadata=[]]], fields=[a1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testDuplicateJoinStateTtlHint">
+  <TestCase name="testStateTtlHintWithJoinHint">
     <Resource name="sql">
-      <![CDATA[select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d'), STATE_TTL('T1' 
= '1d', 'T2' = '8d') */* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 = 
T3.b3]]>
+      <![CDATA[select /*+ STATE_TTL('T1' = '1d', 'T2' = '2d'), BROADCAST(T1) 
*/T1.b1, sum(T1.a1) from T1 join T2 on T1.b1 = T2.b2 group by T1.b1]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5])
-+- LogicalFilter(condition=[AND(=($0, $2), =($3, $5))])
-   +- LogicalJoin(condition=[true], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T2=2d, T3=3d}][STATE_TTL 
inheritPath:[0, 0] options:{T1=1d, T2=8d}]]])
-      :- LogicalJoin(condition=[true], joinType=[inner], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0, 0] options:{T2=2d, 
T3=3d}][STATE_TTL inheritPath:[0, 0, 0] options:{T1=1d, T2=8d}]]])
-      :  :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-      :  +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, T3]], 
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], stateTtlHints=[[[STATE_TTL 
options:{T1=1d, T2=2d}]]])
++- LogicalProject(b1=[$1], a1=[$0])
+   +- LogicalJoin(condition=[=($1, $3)], joinType=[inner], 
joinHints=[[[BROADCAST inheritPath:[0, 0] options:[T1]]]], 
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
+      :- LogicalTableScan(table=[[default_catalog, default_database, T1]], 
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T2]], 
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Join(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{RIGHT=3d}]]])
-:- Exchange(distribution=[hash[b2]])
-:  +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=2d}]]])
-:     :- Exchange(distribution=[hash[a1]])
-:     :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], 
fields=[a1, b1])
-:     +- Exchange(distribution=[hash[a2]])
-:        +- TableSourceScan(table=[[default_catalog, default_database, T2]], 
fields=[a2, b2])
-+- Exchange(distribution=[hash[b3]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T3]], 
fields=[a3, b3])
+GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1])
++- Exchange(distribution=[hash[b1]])
+   +- Calc(select=[b1, a1])
+      +- Join(joinType=[InnerJoin], where=[=(b1, b2)], select=[a1, b1, b2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
joinHints=[[[BROADCAST options:[LEFT]]]], stateTtlHints=[[[STATE_TTL 
options:{LEFT=1d, RIGHT=2d}]]])
+         :- Exchange(distribution=[hash[b1]])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1]], fields=[a1, b1])
+         +- Exchange(distribution=[hash[b2]])
+            +- TableSourceScan(table=[[default_catalog, default_database, T2, 
project=[b2], metadata=[]]], fields=[b2])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
new file mode 100644
index 00000000000..cebe1e5898b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
@@ -0,0 +1,272 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 52,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 53,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "POSTFIX",
+      "internalName" : "$IS TRUE$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 0,
+          "type" : "INT"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` BOOLEAN NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[b, a, (a > 1) IS TRUE AS $f2, c])"
+  }, {
+    "id" : 54,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` BOOLEAN NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 55,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "cnt",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "avg_a",
+      "internalName" : "$AVG$1",
+      "argList" : [ 1 ],
+      "filterArg" : 2,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "INT"
+    }, {
+      "name" : "min_c",
+      "internalName" : "$MIN$1",
+      "argList" : [ 3 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "aggCallNeedRetractions" : [ false, false, false ],
+    "generateUpdateBefore" : false,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "4 d",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT NOT NULL, `avg_a` INT, 
`min_c` VARCHAR(2147483647)>",
+    "description" : "GroupAggregate(groupBy=[b], select=[b, COUNT(*) AS cnt, 
AVG(a) FILTER $f2 AS avg_a, MIN(c) AS min_c])"
+  }, {
+    "id" : 56,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "DOUBLE"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT, `avg_a` DOUBLE, `min_c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[b, CAST(cnt AS BIGINT) AS cnt, CAST(avg_a AS 
DOUBLE) AS avg_a, min_c])"
+  }, {
+    "id" : 57,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "cnt",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "avg_a",
+              "dataType" : "DOUBLE"
+            }, {
+              "name" : "min_c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_b",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "b" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT, `avg_a` DOUBLE, `min_c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b, cnt, avg_a, min_c])"
+  } ],
+  "edges" : [ {
+    "source" : 52,
+    "target" : 53,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 53,
+    "target" : 54,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 54,
+    "target" : 55,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 55,
+    "target" : 56,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 56,
+    "target" : 57,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
new file mode 100644
index 00000000000..50e4bee3a94
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
 differ

Reply via email to