This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f32b2a9e347 [FLINK-34053][table-planner] Support state ttl hint for
group aggregate
f32b2a9e347 is described below
commit f32b2a9e347d7539819a88252e4e32deba247515
Author: Xuyang <[email protected]>
AuthorDate: Tue Jan 30 09:31:17 2024 +0800
[FLINK-34053][table-planner] Support state ttl hint for group aggregate
This closes #24179
---
.../planner/calcite/RelTimeIndicatorConverter.java | 3 +-
.../planner/hint/CapitalizeQueryHintsShuttle.java | 11 +-
...earQueryHintsWithInvalidPropagationShuttle.java | 35 ++-
.../table/planner/hint/FlinkHintStrategies.java | 7 +-
.../flink/table/planner/hint/FlinkHints.java | 11 +-
.../table/planner/hint/QueryHintsRelShuttle.java | 13 +-
.../flink/table/planner/hint/StateTtlHint.java | 26 +-
.../planner/plan/nodes/exec/StateMetadata.java | 9 +-
.../stream/StreamExecGlobalGroupAggregate.java | 4 +-
.../exec/stream/StreamExecGroupAggregate.java | 4 +-
.../StreamExecIncrementalGroupAggregate.java | 4 +-
.../planner/plan/optimize/QueryHintsResolver.java | 142 +++++++--
.../planner/calcite/FlinkLogicalRelFactories.scala | 2 +-
.../plan/nodes/logical/FlinkLogicalAggregate.scala | 45 ++-
.../StreamPhysicalGlobalGroupAggregate.scala | 14 +-
.../stream/StreamPhysicalGroupAggregate.scala | 13 +-
.../stream/StreamPhysicalGroupAggregateBase.scala | 8 +-
.../StreamPhysicalIncrementalGroupAggregate.scala | 14 +-
.../nodes/physical/stream/StreamPhysicalJoin.scala | 2 +-
.../plan/rules/logical/SplitAggregateRule.scala | 6 +-
.../physical/stream/IncrementalAggregateRule.scala | 7 +-
.../stream/StreamPhysicalGroupAggregateRule.scala | 3 +-
.../stream/TwoStageOptimizedAggregateRule.scala | 4 +-
.../planner/plan/utils/RelTreeWriterImpl.scala | 14 +-
...HintsWithInvalidPropagationShuttleTestBase.java | 13 +
...eTtlHintsWithInvalidPropagationShuttleTest.java | 74 ++++-
.../plan/hints/stream/StateTtlHintTest.java | 103 ++++++-
.../plan/nodes/exec/serde/StateMetadataTest.java | 18 +-
.../exec/stream/GroupAggregateRestoreTest.java | 3 +-
.../exec/stream/GroupAggregateTestPrograms.java | 29 ++
.../ConfigureOperatorLevelStateTtlJsonITCase.java | 30 +-
...teTtlHintsWithInvalidPropagationShuttleTest.xml | 92 +++++-
.../planner/plan/hints/stream/StateTtlHintTest.xml | 329 +++++++++++++++++----
.../plan/agg-with-state-ttl-hint.json | 272 +++++++++++++++++
.../agg-with-state-ttl-hint/savepoint/_metadata | Bin 0 -> 11151 bytes
35 files changed, 1187 insertions(+), 177 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index c8004ef1867..97a073ac999 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -470,7 +470,8 @@ public final class RelTimeIndicatorConverter extends
RelHomogeneousShuttle {
tableAgg.getInput(),
tableAgg.getGroupSet(),
tableAgg.getGroupSets(),
- tableAgg.getAggCallList());
+ tableAgg.getAggCallList(),
+ Collections.emptyList());
FlinkLogicalAggregate convertedAgg = visitAggregate(correspondingAgg);
return new FlinkLogicalTableAggregate(
tableAgg.getCluster(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
index 3acbf3fa34a..006dab4b832 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/CapitalizeQueryHintsShuttle.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.hint;
-import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
@@ -32,11 +31,11 @@ import java.util.stream.Collectors;
public class CapitalizeQueryHintsShuttle extends QueryHintsRelShuttle {
@Override
- protected RelNode visitBiRel(BiRel biRel) {
- Hintable hBiRel = (Hintable) biRel;
+ protected RelNode doVisit(RelNode node) {
+ Hintable hNode = (Hintable) node;
AtomicBoolean changed = new AtomicBoolean(false);
List<RelHint> hintsWithCapitalJoinHints =
- hBiRel.getHints().stream()
+ hNode.getHints().stream()
.map(
hint -> {
String capitalHintName =
hint.hintName.toUpperCase(Locale.ROOT);
@@ -69,9 +68,9 @@ public class CapitalizeQueryHintsShuttle extends
QueryHintsRelShuttle {
.collect(Collectors.toList());
if (changed.get()) {
- return super.visit(hBiRel.withHints(hintsWithCapitalJoinHints));
+ return super.visit(hNode.withHints(hintsWithCapitalJoinHints));
} else {
- return super.visit(biRel);
+ return super.visit(node);
}
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
index 8f2d8406cb7..66973683907 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttle.java
@@ -18,11 +18,11 @@
package org.apache.flink.table.planner.hint;
-import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.sql.SqlNode;
@@ -50,15 +50,15 @@ import java.util.stream.Collectors;
public class ClearQueryHintsWithInvalidPropagationShuttle extends
QueryHintsRelShuttle {
@Override
- protected RelNode visitBiRel(BiRel biRel) {
- List<RelHint> hints = ((Hintable) biRel).getHints();
+ protected RelNode doVisit(RelNode node) {
+ List<RelHint> hints = ((Hintable) node).getHints();
Set<String> allHintNames =
hints.stream().map(hint ->
hint.hintName).collect(Collectors.toSet());
// there are no query hints on this Join/Correlate node
if (allHintNames.stream().noneMatch(FlinkHints::isQueryHint)) {
- return super.visit(biRel);
+ return super.visit(node);
}
Optional<RelHint> firstAliasHint =
@@ -66,9 +66,9 @@ public class ClearQueryHintsWithInvalidPropagationShuttle
extends QueryHintsRelS
.filter(hint ->
FlinkHints.HINT_ALIAS.equals(hint.hintName))
.findFirst();
- // there are no alias hints on this Join/Correlate node
+ // there are no alias hints on this Join/Correlate/Aggregate node
if (!firstAliasHint.isPresent()) {
- return super.visit(biRel);
+ return super.visit(node);
}
List<RelHint> queryHintsFromOuterQueryBlock =
@@ -84,10 +84,10 @@ public class ClearQueryHintsWithInvalidPropagationShuttle
extends QueryHintsRelS
.collect(Collectors.toList());
if (queryHintsFromOuterQueryBlock.isEmpty()) {
- return super.visit(biRel);
+ return super.visit(node);
}
- RelNode newRelNode = biRel;
+ RelNode newRelNode = node;
ClearOuterQueryHintShuttle clearOuterQueryHintShuttle;
for (RelHint outerQueryHint : queryHintsFromOuterQueryBlock) {
@@ -128,26 +128,31 @@ public class ClearQueryHintsWithInvalidPropagationShuttle
extends QueryHintsRelS
@Override
public RelNode visit(LogicalCorrelate correlate) {
- return visitBiRel(correlate);
+ return doVisit(correlate);
}
@Override
public RelNode visit(LogicalJoin join) {
- return visitBiRel(join);
+ return doVisit(join);
}
- private RelNode visitBiRel(BiRel biRel) {
- Hintable hBiRel = (Hintable) biRel;
- List<RelHint> hints = new ArrayList<>(hBiRel.getHints());
+ @Override
+ public RelNode visit(LogicalAggregate aggregate) {
+ return doVisit(aggregate);
+ }
+
+ private RelNode doVisit(RelNode node) {
+ Hintable hNode = (Hintable) node;
+ List<RelHint> hints = new ArrayList<>(hNode.getHints());
Optional<RelHint> invalidQueryHint = getInvalidQueryHint(hints);
// if this node contains the query hint that needs to be removed
if (invalidQueryHint.isPresent()) {
hints.remove(invalidQueryHint.get());
- return super.visit(hBiRel.withHints(hints));
+ return super.visit(hNode.withHints(hints));
}
- return super.visit(biRel);
+ return super.visit(node);
}
/**
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
index 8b04002d1ab..afdda70ce41 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
@@ -122,8 +122,9 @@ public abstract class FlinkHintStrategies {
.build())
.hintStrategy(
StateTtlHint.STATE_TTL.getHintName(),
- // TODO support agg state ttl hint
- HintStrategy.builder(HintPredicates.JOIN)
+ HintStrategy.builder(
+ HintPredicates.or(
+ HintPredicates.JOIN,
HintPredicates.AGGREGATE))
.optionChecker(STATE_TTL_NON_EMPTY_KV_OPTION_CHECKER)
.build())
.build();
@@ -252,7 +253,7 @@ public abstract class FlinkHintStrategies {
litmus.check(
ttlHint.kvOptions.size() > 0,
- "Invalid hint about STATE_TTL, expecting at least one
key-value options specified.");
+ "Invalid STATE_TTL hint, expecting at least one
key-value options specified.");
// validate the hint value
ttlHint.kvOptions
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
index dde8a97c2bd..18f13d75472 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
@@ -52,7 +52,7 @@ public abstract class FlinkHints {
// ~ Internal alias tag hint
public static final String HINT_ALIAS = "ALIAS";
- // ~ Option name for hints on join or correlate
+ // ~ Option name for hints on BiRel like join or correlate
public static final String LEFT_INPUT = "LEFT";
public static final String RIGHT_INPUT = "RIGHT";
@@ -267,11 +267,8 @@ public abstract class FlinkHints {
return JoinStrategy.isJoinStrategy(hintName) ||
StateTtlHint.isStateTtlHint(hintName);
}
- /**
- * Currently, lookup join hints and state ttl hints are KV hints. And
regular join hints are
- * LIST hints.
- */
- public static boolean isKVQueryHint(String hintName) {
- return JoinStrategy.isLookupHint(hintName) ||
StateTtlHint.isStateTtlHint(hintName);
+ /** Check if the hint is a alias hint. */
+ public static boolean isAliasHint(String hintName) {
+ return FlinkHints.HINT_ALIAS.equalsIgnoreCase(hintName);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
index d2a0a47053d..7c02bb0b756 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/QueryHintsRelShuttle.java
@@ -18,9 +18,9 @@
package org.apache.flink.table.planner.hint;
-import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCorrelate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -37,15 +37,20 @@ public abstract class QueryHintsRelShuttle extends
RelShuttleImpl {
if (containsSubQuery(join)) {
join = (LogicalJoin) resolveSubQuery(join, relNode ->
relNode.accept(this));
}
- return visitBiRel(join);
+ return doVisit(join);
}
@Override
public RelNode visit(LogicalCorrelate correlate) {
- return visitBiRel(correlate);
+ return doVisit(correlate);
}
- protected abstract RelNode visitBiRel(BiRel biRel);
+ @Override
+ public RelNode visit(LogicalAggregate aggregate) {
+ return doVisit(aggregate);
+ }
+
+ protected abstract RelNode doVisit(RelNode node);
@Override
public RelNode visit(LogicalFilter filter) {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
index e166ccaf98f..db1896d0d0f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
@@ -22,9 +22,12 @@ import org.apache.flink.util.TimeUtils;
import org.apache.calcite.rel.hint.RelHint;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Hint strategy to configure different {@link
@@ -61,12 +64,13 @@ public enum StateTtlHint {
}
/**
- * Get the state ttl from hints.
+ * Get the state ttl from hints on the {@link
org.apache.calcite.rel.BiRel} such as Join and
+ * Correlate.
*
* @return The key of the map is the input side. The value of the map is
the state ttl in
* milliseconds.
*/
- public static Map<Integer, Long> getStateTtlFromHint(List<RelHint> hints) {
+ public static Map<Integer, Long> getStateTtlFromHintOnBiRel(List<RelHint>
hints) {
Map<Integer, Long> stateTtlFromHint = new java.util.HashMap<>();
hints.stream()
.filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
@@ -87,4 +91,22 @@ public enum StateTtlHint {
return stateTtlFromHint;
}
+
+ /**
+ * Get the state ttl from hints on the {@link
org.apache.calcite.rel.SingleRel} such as
+ * Aggregate.
+ *
+ * @return the state ttl in milliseconds. If no state ttl hints set from
hint, return "null".
+ */
+ @Nullable
+ public static Long getStateTtlFromHintOnSingleRel(List<RelHint> hints) {
+ List<Long> allStateTtl =
+ hints.stream()
+ .filter(hint ->
StateTtlHint.isStateTtlHint(hint.hintName))
+ .flatMap(hint -> hint.listOptions.stream())
+ .map(ttl -> TimeUtils.parseDuration(ttl).toMillis())
+ .collect(Collectors.toList());
+
+ return allStateTtl.isEmpty() ? null : allStateTtl.get(0);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
index a51b95446e1..d37aeb449a5 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java
@@ -106,10 +106,17 @@ public class StateMetadata {
public static List<StateMetadata> getOneInputOperatorDefaultMeta(
ReadableConfig tableConfig, String stateName) {
+ return getOneInputOperatorDefaultMeta(null, tableConfig, stateName);
+ }
+
+ public static List<StateMetadata> getOneInputOperatorDefaultMeta(
+ @Nullable Long stateTtlFromHint, ReadableConfig tableConfig,
String stateName) {
return Collections.singletonList(
new StateMetadata(
0,
-
tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION),
+ stateTtlFromHint == null
+ ?
tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION)
+ : Duration.ofMillis(stateTtlFromHint),
stateName));
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 37809b9cc92..95f036d06e5 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -133,6 +133,7 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
boolean generateUpdateBefore,
boolean needRetraction,
@Nullable Integer indexOfCountStar,
+ @Nullable Long stateTtlFromHint,
InputProperty inputProperty,
RowType outputType,
String description) {
@@ -148,7 +149,8 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
generateUpdateBefore,
needRetraction,
indexOfCountStar,
- StateMetadata.getOneInputOperatorDefaultMeta(tableConfig,
STATE_NAME),
+ StateMetadata.getOneInputOperatorDefaultMeta(
+ stateTtlFromHint, tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
outputType,
description);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index cdcb725aca9..cb5a25ed913 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -120,6 +120,7 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
boolean[] aggCallNeedRetractions,
boolean generateUpdateBefore,
boolean needRetraction,
+ @Nullable Long stateTtlFromHint,
InputProperty inputProperty,
RowType outputType,
String description) {
@@ -132,7 +133,8 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
aggCallNeedRetractions,
generateUpdateBefore,
needRetraction,
- StateMetadata.getOneInputOperatorDefaultMeta(tableConfig,
STATE_NAME),
+ StateMetadata.getOneInputOperatorDefaultMeta(
+ stateTtlFromHint, tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
outputType,
description);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 36d20f764d4..fb7d767536b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -126,6 +126,7 @@ public class StreamExecIncrementalGroupAggregate extends
StreamExecAggregateBase
boolean[] partialAggCallNeedRetractions,
RowType partialLocalAggInputType,
boolean partialAggNeedRetraction,
+ @Nullable Long stateTtlFromHint,
InputProperty inputProperty,
RowType outputType,
String description) {
@@ -140,7 +141,8 @@ public class StreamExecIncrementalGroupAggregate extends
StreamExecAggregateBase
partialAggCallNeedRetractions,
partialLocalAggInputType,
partialAggNeedRetraction,
- StateMetadata.getOneInputOperatorDefaultMeta(tableConfig,
STATE_NAME),
+ StateMetadata.getOneInputOperatorDefaultMeta(
+ stateTtlFromHint, tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
outputType,
description);
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
index b3b08fd1fef..8e08933db12 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.optimize;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.hint.FlinkHints;
import org.apache.flink.table.planner.hint.JoinStrategy;
@@ -27,6 +28,7 @@ import org.apache.flink.table.planner.hint.StateTtlHint;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.Hintable;
import org.apache.calcite.rel.hint.RelHint;
@@ -83,21 +85,50 @@ public class QueryHintsResolver extends
QueryHintsRelShuttle {
}
@Override
- protected RelNode visitBiRel(BiRel biRel) {
- Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
- Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
-
- Set<RelHint> existentKVHints = new HashSet<>();
-
- List<RelHint> oldHints = ((Hintable) biRel).getHints();
+ protected RelNode doVisit(RelNode node) {
+ List<RelHint> oldHints = ((Hintable) node).getHints();
List<RelHint> oldQueryHints = FlinkHints.getAllQueryHints(oldHints);
// has no hints, return directly.
if (oldQueryHints.isEmpty()) {
- return super.visitChildren(biRel);
+ return super.visitChildren(node);
}
- List<RelHint> newHints = new ArrayList<>();
+ final List<RelHint> newHints;
+ if (node instanceof BiRel) {
+ BiRel biRel = (BiRel) node;
+ Optional<String> leftName =
extractAliasOrTableName(biRel.getLeft());
+ Optional<String> rightName =
extractAliasOrTableName(biRel.getRight());
+ newHints = validateAndGetNewHints(leftName, rightName, oldHints);
+ } else if (node instanceof SingleRel) {
+ SingleRel singleRel = (SingleRel) node;
+ Optional<String> tableName =
extractAliasOrTableName(singleRel.getInput());
+ newHints = validateAndGetNewHints(tableName, oldHints);
+ } else {
+ throw new TableException(
+ String.format(
+ "Unsupported node when resolving query hints: %s",
+ node.getClass().getCanonicalName()));
+ }
+
+ RelNode newNode = super.visitChildren(node);
+ List<RelHint> mergedHints = mergeQueryHintsIfNecessary(newHints);
+ // replace new query hints
+ return ((Hintable) newNode).withHints(mergedHints);
+ }
+
+ /**
+ * Resolve the query hints in the {@link BiRel} such as {@link
+ * org.apache.calcite.rel.core.Correlate} and {@link
org.apache.calcite.rel.core.Join}.
+ *
+ * @param leftName left table name, view name or alias name
+ * @param rightName right table name, view name or alias name
+ * @param oldHints old hints in this node
+ */
+ private List<RelHint> validateAndGetNewHints(
+ Optional<String> leftName, Optional<String> rightName,
List<RelHint> oldHints) {
+ Set<RelHint> existentKVHints = new HashSet<>();
+ List<RelHint> newHints = new ArrayList<>();
for (RelHint hint : oldHints) {
if (JoinStrategy.isLookupHint(hint.hintName)) {
allHints.add(trimInheritPath(hint));
@@ -156,12 +187,43 @@ public class QueryHintsResolver extends
QueryHintsRelShuttle {
}
}
}
+ return newHints;
+ }
- RelNode newNode = super.visitChildren(biRel);
+ /**
+ * Resolve the query hints in the {@link SingleRel} such as {@link
+ * org.apache.calcite.rel.core.Aggregate}.
+ *
+ * @param inputName the input table name, view name or alias name
+ * @param oldHints old hints in this node
+ */
+ private List<RelHint> validateAndGetNewHints(
+ Optional<String> inputName, List<RelHint> oldHints) {
+ Set<RelHint> existentKVHints = new HashSet<>();
- newHints = mergeQueryHintsIfNecessary(newHints);
- // replace new query hints
- return ((Hintable) newNode).withHints(newHints);
+ List<RelHint> newHints = new ArrayList<>();
+
+ for (RelHint hint : oldHints) {
+ if (StateTtlHint.isStateTtlHint(hint.hintName)) {
+ List<String> definedTables = new
ArrayList<>(hint.kvOptions.keySet());
+ initOptionInfoAboutQueryHintsForCheck(hint.hintName,
definedTables);
+ // the kv options will be converted to list options
+ List<String> newListOptions =
+ getNewStateTtlHintOptions(inputName, hint.kvOptions,
hint.hintName);
+ if (!newListOptions.isEmpty()) {
+ // only accept a matched hint
+ validHints.add(trimInheritPath(hint));
+ newHints.add(
+
RelHint.builder(hint.hintName).hintOptions(newListOptions).build());
+ }
+ } else {
+ if (!existentKVHints.contains(hint)) {
+ existentKVHints.add(hint);
+ newHints.add(hint);
+ }
+ }
+ }
+ return newHints;
}
private List<String> getNewJoinHintOptions(
@@ -228,6 +290,19 @@ public class QueryHintsResolver extends
QueryHintsRelShuttle {
return newOptions;
}
+ /** The state ttl hint for {@link SingleRel} will be converted to a list
option. */
+ private List<String> getNewStateTtlHintOptions(
+ Optional<String> inputName, Map<String, String> kvOptions, String
hintName) {
+ updateInfoForOptionCheck(hintName, inputName);
+ return kvOptions.entrySet().stream()
+ .filter(
+ entry ->
+ inputName.isPresent()
+ && matchIdentifier(entry.getKey(),
inputName.get()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
private void validateHints() {
Set<RelHint> invalidHints = new HashSet<>(allHints);
invalidHints.removeAll(validHints);
@@ -389,32 +464,53 @@ public class QueryHintsResolver extends
QueryHintsRelShuttle {
private List<RelHint> mergeQueryHintsIfNecessary(List<RelHint> hints) {
List<RelHint> result = new ArrayList<>();
Map<String, Map<String, String>> kvHintsMap = new HashMap<>();
+ Map<String, String> listHintsMap = new HashMap<>();
for (RelHint hint : hints) {
String hintName = hint.hintName;
- // if the hint is not KV hint, add it directly
- if (!FlinkHints.isKVQueryHint(hintName)) {
+ // if the hint is a join hint or alias hint, add it directly
+ if (JoinStrategy.isJoinStrategy(hintName) ||
FlinkHints.isAliasHint(hintName)) {
result.add(hint);
continue;
}
- // if the hint is KV hint, merge it with the existing hints
- Map<String, String> kvOptions = new HashMap<>(hint.kvOptions);
- if (kvHintsMap.containsKey(hintName)) {
- Map<String, String> existingOptions = kvHintsMap.get(hintName);
- for (String key : kvOptions.keySet()) {
- // if the key is same, choose the first hint to take effect
- existingOptions.computeIfAbsent(key, k ->
kvOptions.get(key));
+ if (!hint.kvOptions.isEmpty()) {
+ // if the hint is KV hint like lookup hint and state ttl hint
on BiRel, merge it
+ // with the existing hints
+ Map<String, String> kvOptions = new HashMap<>(hint.kvOptions);
+ if (kvHintsMap.containsKey(hintName)) {
+ Map<String, String> existingOptions =
kvHintsMap.get(hintName);
+ for (String key : kvOptions.keySet()) {
+ // if the key is same, choose the first hint to take
effect
+ existingOptions.computeIfAbsent(key, k ->
kvOptions.get(key));
+ }
+ } else {
+ kvHintsMap.put(hintName, kvOptions);
}
+ } else if (!hint.listOptions.isEmpty()) {
+ // if the hint is LIST hint like state ttl hint on SingleRel,
choose the first hint
+ // to take effect
+ listHintsMap.computeIfAbsent(hintName, k ->
hint.listOptions.get(0));
} else {
- kvHintsMap.put(hintName, kvOptions);
+ // throw an exception again although empty options may be
checked by different
+ // checkers in FlinkHintStrategies before
+ throw new ValidationException(
+ String.format(
+ "Invalid %s hint, the key-value options and
list options are all empty",
+ hintName));
}
}
for (String kvHintName : kvHintsMap.keySet()) {
result.add(RelHint.builder(kvHintName).hintOptions(kvHintsMap.get(kvHintName)).build());
}
+ for (String listHintName : listHintsMap.keySet()) {
+ result.add(
+ RelHint.builder(listHintName)
+
.hintOptions(Collections.singletonList(listHintsMap.get(listHintName)))
+ .build());
+ }
return result;
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
index 652a177bb98..21a8476b77b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkLogicalRelFactories.scala
@@ -133,7 +133,7 @@ object FlinkLogicalRelFactories {
groupSet: ImmutableBitSet,
groupSets: ImmutableList[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): RelNode = {
- FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls)
+ FlinkLogicalAggregate.create(input, groupSet, groupSets, aggCalls, hints)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
index 0c9c1f2ca03..1abd1dc59f0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalAggregate.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.planner.plan.nodes.logical
+import org.apache.flink.table.planner.JList
import org.apache.flink.table.planner.plan.PartialFinalType
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.utils.AggregateUtil
@@ -48,15 +49,9 @@ class FlinkLogicalAggregate(
groupSets: util.List[ImmutableBitSet],
aggCalls: util.List[AggregateCall],
/* flag indicating whether to skip SplitAggregateRule */
- var partialFinalType: PartialFinalType = PartialFinalType.NONE)
- extends Aggregate(
- cluster,
- traitSet,
- Collections.emptyList[RelHint](),
- child,
- groupSet,
- groupSets,
- aggCalls)
+ var partialFinalType: PartialFinalType = PartialFinalType.NONE,
+ hints: JList[RelHint] = Collections.emptyList())
+ extends Aggregate(cluster, traitSet, hints, child, groupSet, groupSets,
aggCalls)
with FlinkLogicalRel {
def setPartialFinalType(partialFinalType: PartialFinalType): Unit = {
@@ -76,7 +71,9 @@ class FlinkLogicalAggregate(
groupSet,
groupSets,
aggCalls,
- partialFinalType)
+ partialFinalType,
+ getHints
+ )
}
override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery):
RelOptCost = {
@@ -118,7 +115,12 @@ private class FlinkLogicalAggregateBatchConverter(config:
ConverterRule.Config)
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalAggregate]
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalAggregate.create(newInput, agg.getGroupSet, agg.getGroupSets,
agg.getAggCallList)
+ FlinkLogicalAggregate.create(
+ newInput,
+ agg.getGroupSet,
+ agg.getGroupSets,
+ agg.getAggCallList,
+ agg.getHints)
}
}
@@ -139,7 +141,12 @@ private class FlinkLogicalAggregateStreamConverter(config:
ConverterRule.Config)
override def convert(rel: RelNode): RelNode = {
val agg = rel.asInstanceOf[LogicalAggregate]
val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalAggregate.create(newInput, agg.getGroupSet, agg.getGroupSets,
agg.getAggCallList)
+ FlinkLogicalAggregate.create(
+ newInput,
+ agg.getGroupSet,
+ agg.getGroupSets,
+ agg.getAggCallList,
+ agg.getHints)
}
}
@@ -168,9 +175,19 @@ object FlinkLogicalAggregate {
input: RelNode,
groupSet: ImmutableBitSet,
groupSets: util.List[ImmutableBitSet],
- aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = {
+ aggCalls: util.List[AggregateCall],
+ hints: JList[RelHint]): FlinkLogicalAggregate = {
val cluster = input.getCluster
val traitSet = cluster.traitSetOf(FlinkConventions.LOGICAL).simplify()
- new FlinkLogicalAggregate(cluster, traitSet, input, groupSet, groupSets,
aggCalls)
+ new FlinkLogicalAggregate(
+ cluster,
+ traitSet,
+ input,
+ groupSet,
+ groupSets,
+ aggCalls,
+ PartialFinalType.NONE,
+ hints
+ )
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
index d98eb3c8435..0ae4f46b891 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
@@ -17,7 +17,9 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.planner.JList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
import org.apache.flink.table.planner.plan.PartialFinalType
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate
@@ -28,6 +30,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
+
+import java.util.Collections
/**
* Stream physical RelNode for unbounded global group aggregate.
@@ -46,8 +51,9 @@ class StreamPhysicalGlobalGroupAggregate(
val localAggInputRowType: RelDataType,
val needRetraction: Boolean,
val partialFinalType: PartialFinalType,
- indexOfCountStar: Option[Int] = Option.empty)
- extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel,
grouping, aggCalls) {
+ indexOfCountStar: Option[Int] = Option.empty,
+ hints: JList[RelHint] = Collections.emptyList())
+ extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel,
grouping, aggCalls, hints) {
// if the indexOfCountStar is valid, the needRetraction should be true
require(indexOfCountStar.isEmpty || indexOfCountStar.get >= 0 &&
needRetraction)
@@ -90,7 +96,8 @@ class StreamPhysicalGlobalGroupAggregate(
localAggInputRowType,
needRetraction,
partialFinalType,
- indexOfCountStar)
+ indexOfCountStar,
+ hints)
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate(
generateUpdateBefore,
needRetraction,
indexOfCountStar.map(Integer.valueOf).orNull,
+ StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
index e5c73784f3a..cab8c770d66 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregate.scala
@@ -17,7 +17,9 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.planner.JList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
import org.apache.flink.table.planner.plan.PartialFinalType
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate
@@ -28,8 +30,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
import java.util
+import java.util.Collections
/**
* Stream physical RelNode for unbounded group aggregate.
@@ -46,8 +50,9 @@ class StreamPhysicalGroupAggregate(
outputRowType: RelDataType,
grouping: Array[Int],
aggCalls: Seq[AggregateCall],
- var partialFinalType: PartialFinalType = PartialFinalType.NONE)
- extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel,
grouping, aggCalls) {
+ var partialFinalType: PartialFinalType = PartialFinalType.NONE,
+ hints: JList[RelHint] = Collections.emptyList())
+ extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel,
grouping, aggCalls, hints) {
private val aggInfoList =
AggregateUtil.deriveAggregateInfoList(this, grouping.length, aggCalls)
@@ -64,7 +69,8 @@ class StreamPhysicalGroupAggregate(
outputRowType,
grouping,
aggCalls,
- partialFinalType)
+ partialFinalType,
+ hints)
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -91,6 +97,7 @@ class StreamPhysicalGroupAggregate(
aggCallNeedRetractions,
generateUpdateBefore,
needRetraction,
+ StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
index 1bea3f0b01d..ca0315231ca 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGroupAggregateBase.scala
@@ -17,9 +17,14 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.planner.JList
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.hint.RelHint
+
+import java.util.Collections
/**
* Base stream physical RelNode for unbounded group aggregate.
@@ -44,6 +49,7 @@ abstract class StreamPhysicalGroupAggregateBase(
traitSet: RelTraitSet,
inputRel: RelNode,
val grouping: Array[Int],
- val aggCalls: Seq[AggregateCall])
+ val aggCalls: Seq[AggregateCall],
+ val hints: JList[RelHint] = Collections.emptyList())
extends SingleRel(cluster, traitSet, inputRel)
with StreamPhysicalRel {}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
index cd4377760bb..718e3a76ca0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala
@@ -17,7 +17,9 @@
*/
package org.apache.flink.table.planner.plan.nodes.physical.stream
+import org.apache.flink.table.planner.JList
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.hint.StateTtlHint
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate
import org.apache.flink.table.planner.plan.utils._
@@ -27,8 +29,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.hint.RelHint
import java.util
+import java.util.Collections
/**
* Stream physical RelNode for unbounded incremental group aggregate.
@@ -68,13 +72,15 @@ class StreamPhysicalIncrementalGroupAggregate(
partialAggCallNeedRetractions: Array[Boolean],
partialAggNeedRetraction: Boolean,
partialLocalAggInputRowType: RelDataType,
- partialGlobalAggRowType: RelDataType)
+ partialGlobalAggRowType: RelDataType,
+ hints: JList[RelHint] = Collections.emptyList())
extends StreamPhysicalGroupAggregateBase(
cluster,
traitSet,
inputRel,
finalAggGrouping,
- finalAggCalls) {
+ finalAggCalls,
+ hints) {
private lazy val incrementalAggInfo =
AggregateUtil.createIncrementalAggInfoList(
unwrapTypeFactory(inputRel),
@@ -107,7 +113,8 @@ class StreamPhysicalIncrementalGroupAggregate(
partialAggCallNeedRetractions,
partialAggNeedRetraction,
partialLocalAggInputRowType,
- partialGlobalAggRowType)
+ partialGlobalAggRowType,
+ hints)
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -137,6 +144,7 @@ class StreamPhysicalIncrementalGroupAggregate(
partialAggCallNeedRetractions,
FlinkTypeFactory.toLogicalRowType(partialLocalAggInputRowType),
partialAggNeedRetraction,
+ StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index 9de16707e59..77774cd02a8 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -128,7 +128,7 @@ class StreamPhysicalJoin(
getUpsertKeys(right, joinSpec.getRightKeys),
InputProperty.DEFAULT,
InputProperty.DEFAULT,
- StateTtlHint.getStateTtlFromHint(getHints),
+ StateTtlHint.getStateTtlFromHintOnBiRel(getHints),
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index 61827342614..1e8caa75d17 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -314,7 +314,8 @@ class SplitAggregateRule
relBuilder.build(),
fullGroupSet,
ImmutableList.of[ImmutableBitSet](fullGroupSet),
- newPartialAggCalls)
+ newPartialAggCalls,
+ originalAggregate.getHints)
partialAggregate.setPartialFinalType(PartialFinalType.PARTIAL)
relBuilder.push(partialAggregate)
@@ -357,7 +358,8 @@ class SplitAggregateRule
relBuilder.build(),
SplitAggregateRule.remap(fullGroupSet, originalAggregate.getGroupSet),
SplitAggregateRule.remap(fullGroupSet,
Seq(originalAggregate.getGroupSet)),
- finalAggCalls
+ finalAggCalls,
+ originalAggregate.getHints
)
finalAggregate.setPartialFinalType(PartialFinalType.FINAL)
relBuilder.push(finalAggregate)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
index d04f0b9bbf3..2fb63c0bcb0 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
@@ -93,7 +93,9 @@ class IncrementalAggregateRule
partialGlobalAgg.aggCallNeedRetractions,
partialGlobalAgg.needRetraction,
partialLocalAggInputRowType,
- partialGlobalAgg.getRowType)
+ partialGlobalAgg.getRowType,
+ partialGlobalAgg.hints
+ )
val incrAggOutputRowType = incrAgg.getRowType
val newExchange = exchange.copy(exchange.getTraitSet, incrAgg,
exchange.distribution)
@@ -142,7 +144,8 @@ class IncrementalAggregateRule
finalGlobalAgg.localAggInputRowType,
partialGlobalAgg.needRetraction,
finalGlobalAgg.partialFinalType,
- partialGlobalAgg.globalAggInfoList.indexOfCountStar)
+ partialGlobalAgg.globalAggInfoList.indexOfCountStar,
+ finalGlobalAgg.hints)
}
call.transformTo(globalAgg)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index 92f1b76693e..3495ebea06e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -73,7 +73,8 @@ class StreamPhysicalGroupAggregateRule(config: Config)
extends ConverterRule(con
rel.getRowType,
agg.getGroupSet.toArray,
agg.getAggCallList,
- agg.partialFinalType)
+ agg.partialFinalType,
+ agg.getHints)
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
index 272aeb8dfbb..30a792f8383 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedAggregateRule.scala
@@ -137,7 +137,9 @@ class TwoStageOptimizedAggregateRule
aggCallNeedRetractions,
realInput.getRowType,
needRetraction,
- originalAgg.partialFinalType)
+ originalAgg.partialFinalType,
+ Option.empty,
+ originalAgg.hints)
call.transformTo(globalAgg)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
index 62580a63b33..f767be6a8f7 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
@@ -21,10 +21,10 @@ import
org.apache.flink.table.planner.analyze.{FlinkStreamPlanAnalyzers, PlanAdv
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
+import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Correlate, Join, TableScan}
+import org.apache.calcite.rel.core.{Aggregate, Correlate, Join, TableScan}
import org.apache.calcite.rel.externalize.RelWriterImpl
import org.apache.calcite.rel.hint.Hintable
import org.apache.calcite.sql.SqlExplainLevel
@@ -150,6 +150,16 @@ class RelTreeWriterImpl(
printValues.add(Pair.of("stateTtlHints",
RelExplainUtil.hintsToString(stateTtlHints)))
}
+ case _: Aggregate | _: StreamPhysicalGroupAggregateBase =>
+ val aggHints =
+ rel match {
+ case aggregate: Aggregate => aggregate.getHints
+ case _ =>
rel.asInstanceOf[StreamPhysicalGroupAggregateBase].hints
+ }
+ val stateTtlHints = FlinkHints.getAllStateTtlHints(aggHints)
+ if (stateTtlHints.nonEmpty) {
+ printValues.add(Pair.of("stateTtlHints",
RelExplainUtil.hintsToString(stateTtlHints)))
+ }
case _ => // ignore
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
index 4fd658cf97a..af3d9bbdde1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearQueryHintsWithInvalidPropagationShuttleTestBase.java
@@ -112,6 +112,19 @@ abstract class
ClearQueryHintsWithInvalidPropagationShuttleTestBase extends Tabl
+ isBatchMode()
+ "'\n"
+ ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE t4 (\n"
+ + " a BIGINT,\n"
+ + " b BIGINT,\n"
+ + " c BIGINT\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = '"
+ + isBatchMode()
+ + "'\n"
+ + ")");
}
protected String buildRelPlanWithQueryBlockAlias(RelNode node) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
index c052338bcdf..6bd57e3d16b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.util.ImmutableBitSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -51,7 +53,7 @@ public class
ClearStateTtlHintsWithInvalidPropagationShuttleTest
}
@Test
- void testNoNeedToClearStateTtlHint() {
+ void testNoNeedToClearStateTtlHintOnJoin() {
// SELECT t4.a FROM (
// SELECT /*+ StaTe_TtL("t1" = "1d", "t2" = "7d")*/t1.a FROM t1
JOIN t2 ON t1.a = t2.a
// ) t4 JOIN t3 ON t4.a = t3.a
@@ -82,7 +84,7 @@ public class
ClearStateTtlHintsWithInvalidPropagationShuttleTest
}
@Test
- void testClearStateTtlHint() {
+ void testClearStateTtlHintOnJoin() {
// SELECT /*+ StaTe_TtL("t4" = "9d", "t3" = "12d")*/t4.a FROM (
// SELECT /*+ StaTe_TtL("t1" = "1d", "t2" = "7d")*/t1.a FROM t1
JOIN t2 ON t1.a = t2.a
// ) t4 JOIN t3 ON t4.a = t3.a
@@ -119,4 +121,72 @@ public class
ClearStateTtlHintsWithInvalidPropagationShuttleTest
verifyRelPlan(root);
}
+
+ @Test
+ void testNoNeedToClearStateTtlHintOnAggregate() {
+ // SELECT tmp.a, max(tmp.cnt) as max FROM (
+ // SELECT /*+ StaTe_TtL("t4" = "1d")*/t4.a, t4.b, count(t4.c) as
cnt
+ // FROM t4 GROUP BY t4.a, t4.b
+ // ) tmp GROUP BY tmp.a
+ Map<String, String> hintOptions = new HashMap<>();
+ hintOptions.put("t4", "1d");
+
+ RelHint stateTtlHint =
RelHint.builder("StaTe_TtL").hintOptions(hintOptions).build();
+
+ RelHint aliasHint =
RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("tmp").build();
+ RelNode root =
+ builder.scan("t4")
+ .aggregate(
+ builder.groupKey(ImmutableBitSet.of(0, 1)),
+ Collections.singletonList(
+ builder.count(builder.field(1, 0,
"c")).as("cnt")))
+ .hints(stateTtlHint, aliasHint)
+ .project(builder.field(1, 0, "a"), builder.field(1, 0,
"cnt"))
+ .aggregate(
+ builder.groupKey(ImmutableBitSet.of(0)),
+ Collections.singletonList(
+ builder.max(builder.field(1, 0,
"cnt")).as("max")))
+ .project(builder.field(1, 0, "a"), builder.field(1, 0,
"max"))
+ .build();
+
+ verifyRelPlan(root);
+ }
+
+ @Test
+ void testClearStateTtlHintOnAggregate() {
+ // SELECT /*+ StaTe_TtL("tmp" = "2d")*/tmp.a, max(tmp.cnt) as max
FROM (
+ // SELECT /*+ StaTe_TtL("t4" = "1d")*/t4.a, t4.b, count(t4.c) as
cnt
+ // FROM t4 GROUP BY t4.a, t4.b
+ // ) tmp GROUP BY tmp.a
+ Map<String, String> hintOptionsInner = new HashMap<>();
+ hintOptionsInner.put("t4", "1d");
+
+ RelHint stateTtlHintInner =
+
RelHint.builder("StaTe_TtL").hintOptions(hintOptionsInner).build();
+
+ Map<String, String> hintOptionsOuter = new HashMap<>();
+ hintOptionsOuter.put("tmp", "2d");
+
+ RelHint stateTtlHintOuter =
+
RelHint.builder("StaTe_TtL").hintOptions(hintOptionsOuter).build();
+
+ RelHint aliasHint =
RelHint.builder(FlinkHints.HINT_ALIAS).hintOption("tmp").build();
+ RelNode root =
+ builder.scan("t4")
+ .aggregate(
+ builder.groupKey(ImmutableBitSet.of(0, 1)),
+ Collections.singletonList(
+ builder.count(builder.field(1, 0,
"c")).as("cnt")))
+ .hints(stateTtlHintInner, aliasHint)
+ .project(builder.field(1, 0, "a"), builder.field(1, 0,
"cnt"))
+ .aggregate(
+ builder.groupKey(ImmutableBitSet.of(0)),
+ Collections.singletonList(
+ builder.max(builder.field(1, 0,
"cnt")).as("max")))
+ .project(builder.field(1, 0, "a"), builder.field(1, 0,
"max"))
+ .hints(stateTtlHintOuter)
+ .build();
+
+ verifyRelPlan(root);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
index 36c30bd740d..674fdc602ba 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.java
@@ -70,6 +70,10 @@ class StateTtlHintTest extends TableTestBase {
util.tableEnv()
.executeSql("create view V5 as select T1.* from T1 join T2 on
T1.a1 = T2.a2");
+
+ util.tableEnv()
+ .executeSql(
+ "create view V6 as select a1, b1, count(*) as cnt from
T1 group by a1, b1");
}
@Test
@@ -193,7 +197,104 @@ class StateTtlHintTest extends TableTestBase {
assertThatThrownBy(() -> verify(sql))
.isInstanceOf(AssertionError.class)
.hasMessageContaining(
- "Invalid hint about STATE_TTL, expecting at least one
key-value options specified.");
+ "Invalid STATE_TTL hint, expecting at least one
key-value options specified.");
+ }
+
+ @Test
+ void testSimpleAggStateTtl() {
+ String sql = "select /*+ STATE_TTL('T1' = '2d') */ count(*) from T1
group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithUnknownTable() {
+ String sql = "select /*+ STATE_TTL('T2' = '2d') */ count(*) from T1
group by a1";
+ assertThatThrownBy(() -> verify(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The options of following hints cannot match the name
of input tables or views: \n`%s` in `%s`",
+ "T2", "STATE_TTL");
+ }
+
+ @Test
+ void testAggStateTtlWithView() {
+ String sql = "select /*+ STATE_TTL('V6' = '2d') */ max(b1) from V6
group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithUnknownView() {
+ String sql =
+ "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+ + "(select a1, b1, count(*) from T1 group by a1, b1)
TMP group by a1";
+ assertThatThrownBy(() -> verify(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "The options of following hints cannot match the name
of input tables or views: \n`%s` in `%s`",
+ "T1", "STATE_TTL");
+ }
+
+ @Test
+ void testMultiAggStateTtl() {
+ String sql =
+ "select /*+ STATE_TTL('T1' = '2d'), STATE_TTL('T1' = '8d') */
count(*) from T1 group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testSingleAggStateTtlWithMultiKV() {
+ String sql =
+ "select /*+ STATE_TTL('T1' = '2d', 'T1' = '8d') */ count(*)
from T1 group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithCascadeAgg() {
+ String sql =
+ "select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from "
+ + "(select /*+ STATE_TTL('T1' = '4d') */ a1, b1,
count(*) from T1 group by a1, b1) TMP group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlNotPropagateOutOfView() {
+ String sql =
+ "select max(b1) from "
+ + "(select /*+ STATE_TTL('T1' = '4d') */ a1, b1,
count(*) from T1 group by a1, b1) TMP group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlNotPropagateIntoView() {
+ String sql =
+ "select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from "
+ + "(select a1, b1, count(*) from T1 group by a1, b1)
TMP group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithJoin() {
+ String sql =
+ "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+ + "(select T1.* from T1 join T2 on T1.a1 = T2.a2) T1
group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithJoinHint() {
+ String sql =
+ "select /*+ STATE_TTL('T1' = '2d') */ max(b1) from "
+ + "(select /*+ BROADCAST(T1) */T1.* from T1 join T2
on T1.a1 = T2.a2) T1 group by a1";
+ verify(sql);
+ }
+
+ @Test
+ void testAggStateTtlWithEmptyKV() {
+ String sql = "select /*+ STATE_TTL() */ max(b1) from T1 group by a1";
+ assertThatThrownBy(() -> verify(sql))
+ .isInstanceOf(AssertionError.class)
+ .hasMessageContaining(
+ "Invalid STATE_TTL hint, expecting at least one
key-value options specified.");
}
private void verify(String sql) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
index 6a146cb7448..b5a9d8fdf2d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.java
@@ -93,11 +93,13 @@ public class StateMetadataTest {
@ParameterizedTest
public void testGetOneInputOperatorDefaultMeta(
Consumer<TableConfig> configModifier,
+ @Nullable Long stateTtlFromHint,
String expectedStateName,
long expectedTtlMillis) {
configModifier.accept(tableConfig);
List<StateMetadata> stateMetadataList =
- StateMetadata.getOneInputOperatorDefaultMeta(tableConfig,
expectedStateName);
+ StateMetadata.getOneInputOperatorDefaultMeta(
+ stateTtlFromHint, tableConfig, expectedStateName);
assertThat(stateMetadataList).hasSize(1);
assertThat(stateMetadataList.get(0))
.isEqualTo(
@@ -173,12 +175,22 @@ public class StateMetadataTest {
public static Stream<Arguments> provideConfigForOneInput() {
return Stream.of(
- Arguments.of((Consumer<TableConfig>) config -> {}, "fooState",
0L),
+ Arguments.of((Consumer<TableConfig>) config -> {}, null,
"fooState", 0L),
Arguments.of(
(Consumer<TableConfig>)
config -> config.set(IDLE_STATE_RETENTION,
Duration.ofMinutes(10)),
+ null,
"barState",
- 600000L));
+ 600000L),
+ Arguments.of((Consumer<TableConfig>) config -> {}, 100L,
"bazState", 100L),
+ // state ttl from the hint gets a higher priority over
job-level
+ // table.exec.state.ttl
+ Arguments.of(
+ (Consumer<TableConfig>)
+ config -> config.set(IDLE_STATE_RETENTION,
Duration.ofMinutes(10)),
+ 200L,
+ "quxState",
+ 200L));
}
public static Stream<Arguments> provideConfigForMultiInput() {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
index 2c5043d9f4d..300cf8ca012 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateRestoreTest.java
@@ -41,6 +41,7 @@ public class GroupAggregateRestoreTest extends
RestoreTestBase {
GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE,
GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE_MINI_BATCH,
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE,
-
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH);
+
GroupAggregateTestPrograms.GROUP_BY_UDF_WITHOUT_MERGE_MINI_BATCH,
+ GroupAggregateTestPrograms.AGG_WITH_STATE_TTL_HINT);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
index b6a61a72144..6c43cf90959 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java
@@ -371,4 +371,33 @@ public class GroupAggregateTestPrograms {
.build())
.runSql(GROUP_BY_UDF_WITHOUT_MERGE.getRunSqlTestStep().sql)
.build();
+
+ static final TableTestProgram AGG_WITH_STATE_TTL_HINT =
+ TableTestProgram.of("agg-with-state-ttl-hint", "agg with state ttl
hint")
+ .setupTableSource(SOURCE_ONE)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "b BIGINT",
+ "cnt BIGINT",
+ "avg_a DOUBLE",
+ "min_c VARCHAR",
+ "PRIMARY KEY (b) NOT ENFORCED")
+ .consumedBeforeRestore(
+ "+I[1, 1, null, Hi]",
+ "+I[2, 1, 2.0, Hello]",
+ "+U[2, 2, 2.0, Hello]")
+ .consumedAfterRestore(
+ "+U[1, 2, null, Hi]",
+ "+U[2, 3, 2.0, Hello]",
+ "+U[2, 4, 2.0, Hello]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT /*+
STATE_TTL('source_t' = '4d') */"
+ + "b, "
+ + "COUNT(*) AS cnt, "
+ + "AVG(a) FILTER (WHERE a > 1) AS avg_a, "
+ + "MIN(c) AS min_c "
+ + "FROM source_t GROUP BY b")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
index ba5f48faaa7..0efeccb952f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java
@@ -43,7 +43,8 @@ import java.util.function.Function;
class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
@Test
- void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
+ void
testDifferentStateTtlThroughCompiledPlanForDifferentOneInputStreamOperator()
+ throws Exception {
innerTestDeduplicateAndGroupAggregate(
"INSERT INTO OrdersStats \n"
+ "SELECT buyer, COUNT(1) AS ord_cnt, SUM(quantity) AS
quantity_cnt, SUM(amount) AS total_amount FROM (\n"
@@ -65,7 +66,32 @@ class ConfigureOperatorLevelStateTtlJsonITCase extends
JsonPlanTestBase {
}
@Test
- void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception
{
+ void
testDifferentStateTtlThroughSqlHintForDifferentOneInputStreamOperator() throws
Exception {
+ tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
+ tableEnv.getConfig().set("table.exec.mini-batch.size", "2");
+ tableEnv.getConfig().set("table.exec.mini-batch.allow-latency", "1");
+ innerTestDeduplicateAndGroupAggregate(
+ "INSERT INTO OrdersStats \n"
+ + "SELECT /*+STATE_TTL('tmp' = '9s')*/ buyer, COUNT(1)
AS ord_cnt, SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount \n"
+ + "FROM (\n"
+ + " SELECT *, ROW_NUMBER() OVER(PARTITION BY
order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders\n"
+ + ") tmp\n"
+ + "WHERE rk = 1\n"
+ + "GROUP BY buyer",
+ json -> {
+ try {
+ JsonNode target = JsonTestUtils.readFromString(json);
+ JsonTestUtils.setExecNodeStateMetadata(
+ target, "stream-exec-deduplicate", 0, 6000L);
+ return JsonTestUtils.writeToString(target);
+ } catch (IOException e) {
+ throw new TableException("Cannot modify compiled json
plan.", e);
+ }
+ });
+ }
+
+ @Test
+ void
testDifferentStateTtlThroughCompiledPlanForSameTwoInputStreamOperator() throws
Exception {
innerTestRegularJoin(
"INSERT INTO OrdersShipInfo \n"
+ "SELECT a.order_id, a.line_order_id, b.ship_mode
FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id",
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
index 5d2f1a825d6..e33b11a557b 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/hint/ClearStateTtlHintsWithInvalidPropagationShuttleTest.xml
@@ -16,7 +16,41 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testClearStateTtlHint">
+ <TestCase name="testClearStateTtlHintOnAggregate">
+ <Resource name="beforePropagatingHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[StaTe_TtL
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterPropagatingHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[StaTe_TtL
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[StaTe_TtL options:{t4=1d}][StaTe_TtL inheritPath:[0, 0]
options:{tmp=2d}]]], hints=[[[ALIAS options:[tmp]]]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterCapitalizeJoinHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[STATE_TTL
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[STATE_TTL options:{t4=1d}][STATE_TTL inheritPath:[0, 0]
options:{tmp=2d}]]], hints=[[[ALIAS options:[tmp]]]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterClearingJoinHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)], stateTtlHints=[[[STATE_TTL
options:{tmp=2d}]]]), rowType=[RecordType(BIGINT a, BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNoNeedToClearStateTtlHintOnJoin">
<Resource name="beforePropagatingHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
@@ -31,9 +65,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterPropagatingHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StAte_tTl inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
- : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}][StAte_tTl
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+ : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalTableScan(table=[[builtin, default, t2]]),
rowType=[RecordType(BIGINT a)]
+- LogicalTableScan(table=[[builtin, default, t3]]),
rowType=[RecordType(BIGINT a)]
@@ -42,9 +76,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterCapitalizeJoinHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
- : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}][STATE_TTL
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
+ : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalTableScan(table=[[builtin, default, t2]]),
rowType=[RecordType(BIGINT a)]
+- LogicalTableScan(table=[[builtin, default, t3]]),
rowType=[RecordType(BIGINT a)]
@@ -53,7 +87,7 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterClearingJoinHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
@@ -62,7 +96,7 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
]]>
</Resource>
</TestCase>
- <TestCase name="testNoNeedToClearStateTtlHint">
+ <TestCase name="testClearStateTtlHintOnJoin">
<Resource name="beforePropagatingHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
@@ -77,9 +111,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterPropagatingHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StAte_tTl inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
- : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
+ : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[StaTe_TtL inheritPath:[0] options:{t1=1d, t2=7d}][StAte_tTl
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalTableScan(table=[[builtin, default, t2]]),
rowType=[RecordType(BIGINT a)]
+- LogicalTableScan(table=[[builtin, default, t3]]),
rowType=[RecordType(BIGINT a)]
@@ -88,9 +122,9 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterCapitalizeJoinHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
- : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
+ : +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}][STATE_TTL
inheritPath:[0, 0, 0] options:{t4=9d, t3=12d}]]], hints=[[[ALIAS
inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a, BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalTableScan(table=[[builtin, default, t2]]),
rowType=[RecordType(BIGINT a)]
+- LogicalTableScan(table=[[builtin, default, t3]]),
rowType=[RecordType(BIGINT a)]
@@ -99,12 +133,46 @@ LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
<Resource name="afterClearingJoinHints">
<![CDATA[
LogicalProject(a=[$0]), rowType=[RecordType(BIGINT a)]
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t4=9d, t3=12d}]]]),
rowType=[RecordType(BIGINT a, BIGINT a0)]
:- LogicalProject(a=[$0], hints=[[[ALIAS options:[t4]]]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalJoin(condition=[=($0, $1)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{t1=1d, t2=7d}]]],
hints=[[[ALIAS inheritPath:[0] options:[t4]]]]), rowType=[RecordType(BIGINT a,
BIGINT a0)]
: :- LogicalTableScan(table=[[builtin, default, t1]]),
rowType=[RecordType(BIGINT a)]
: +- LogicalTableScan(table=[[builtin, default, t2]]),
rowType=[RecordType(BIGINT a)]
+- LogicalTableScan(table=[[builtin, default, t3]]),
rowType=[RecordType(BIGINT a)]
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testNoNeedToClearStateTtlHintOnAggregate">
+ <Resource name="beforePropagatingHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a,
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterPropagatingHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a,
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[StaTe_TtL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterCapitalizeJoinHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a,
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
+]]>
+ </Resource>
+ <Resource name="afterClearingJoinHints">
+ <![CDATA[
+LogicalAggregate(group=[{0}], max=[MAX($1)]), rowType=[RecordType(BIGINT a,
BIGINT max)]
++- LogicalProject(a=[$0], cnt=[$2]), rowType=[RecordType(BIGINT a, BIGINT cnt)]
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)],
stateTtlHints=[[[STATE_TTL options:{t4=1d}]]], hints=[[[ALIAS
options:[tmp]]]]), rowType=[RecordType(BIGINT a, BIGINT b, BIGINT cnt)]
+ +- LogicalTableScan(table=[[builtin, default, t4]]),
rowType=[RecordType(BIGINT a, BIGINT b, BIGINT c)]
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
index 5e525037b65..b395e6002b2 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/stream/StateTtlHintTest.xml
@@ -16,6 +16,187 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testAggStateTtlNotPropagateIntoView">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from (select a1,
b1, count(*) from T1 group by a1, b1) TMP group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{TMP=2d}]]])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], hints=[[[ALIAS
options:[TMP]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1])
+ +- Exchange(distribution=[hash[a1, b1]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAggStateTtlNotPropagateOutOfView">
+ <Resource name="sql">
+ <![CDATA[select max(b1) from (select /*+ STATE_TTL('T1' = '4d') */ a1,
b1, count(*) from T1 group by a1, b1) TMP group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()],
stateTtlHints=[[[STATE_TTL options:{T1=4d}]]], hints=[[[ALIAS options:[TMP]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0])
+ +- Exchange(distribution=[hash[a1]])
+ +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1],
stateTtlHints=[[[STATE_TTL options:[4d]]]])
+ +- Exchange(distribution=[hash[a1, b1]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAggStateTtlWithCascadeAgg">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('TMP' = '2d') */ max(b1) from (select /*+
STATE_TTL('T1' = '4d') */ a1, b1, count(*) from T1 group by a1, b1) TMP group
by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{TMP=2d}]]])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()],
stateTtlHints=[[[STATE_TTL options:{T1=4d}]]], hints=[[[ALIAS options:[TMP]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1],
stateTtlHints=[[[STATE_TTL options:[4d]]]])
+ +- Exchange(distribution=[hash[a1, b1]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAggStateTtlWithJoin">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ max(b1) from (select T1.*
from T1 join T2 on T1.a1 = T2.a2) T1 group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{T1=2d}]]])
+ +- LogicalProject(a1=[$0], b1=[$1], hints=[[[ALIAS options:[T1]]]])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], hints=[[[ALIAS
inheritPath:[0] options:[T1]]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- Calc(select=[a1, b1])
+ +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+ +- Exchange(distribution=[hash[a2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAggStateTtlWithJoinHint">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ max(b1) from (select /*+
BROADCAST(T1) */T1.* from T1 join T2 on T1.a1 = T2.a2) T1 group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{T1=2d}]]])
+ +- LogicalProject(a1=[$0], b1=[$1], hints=[[[ALIAS options:[T1]]]])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]], hints=[[[ALIAS
inheritPath:[0] options:[T1]]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- Calc(select=[a1, b1])
+ +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
joinHints=[[[BROADCAST options:[LEFT]]]])
+ :- Exchange(distribution=[hash[a1]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+ +- Exchange(distribution=[hash[a2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testAggStateTtlWithView">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('V6' = '2d') */ max(b1) from V6 group by
a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[MAX($1)], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{V6=2d}]]])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()], hints=[[[ALIAS
options:[V6]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, MAX(b1) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- GroupAggregate(groupBy=[a1, b1], select=[a1, b1])
+ +- Exchange(distribution=[hash[a1, b1]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testDuplicateJoinStateTtlHint">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d'), STATE_TTL('T1'
= '1d', 'T2' = '8d') */* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 =
T3.b3]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5])
++- LogicalFilter(condition=[AND(=($0, $2), =($3, $5))])
+ +- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T2=2d, T3=3d}][STATE_TTL
inheritPath:[0, 0] options:{T1=1d, T2=8d}]]])
+ :- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0, 0] options:{T2=2d,
T3=3d}][STATE_TTL inheritPath:[0, 0, 0] options:{T1=1d, T2=8d}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3]],
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{RIGHT=3d}]]])
+:- Exchange(distribution=[hash[b2]])
+: +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=2d}]]])
+: :- Exchange(distribution=[hash[a1]])
+: : +- TableSourceScan(table=[[default_catalog, default_database, T1]],
fields=[a1, b1])
+: +- Exchange(distribution=[hash[a2]])
+: +- TableSourceScan(table=[[default_catalog, default_database, T2]],
fields=[a2, b2])
++- Exchange(distribution=[hash[b3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3]],
fields=[a3, b3])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testJoinStateTtlHintNotPropagateIntoView">
<Resource name="sql">
<![CDATA[select /*+ STATE_TTL('T1' = '1d')*/T1.* from T1 join V5 on
T1.a1 = V5.a1]]>
@@ -108,7 +289,7 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1,
b1, a2, b2], leftInput
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], stateTtlHints=[[[STATE_TTL
options:{T1=1d, T2=2d}]]])
+- LogicalProject(b1=[$1], a1=[$0])
+- LogicalJoin(condition=[=($1, $3)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
@@ -125,32 +306,6 @@ GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS
EXPR$1])
: +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+- Exchange(distribution=[hash[b2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2,
project=[b2], metadata=[]]], fields=[b2])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testStateTtlHintWithJoinHint">
- <Resource name="sql">
- <![CDATA[select /*+ STATE_TTL('T1' = '1d', 'T2' = '2d'), BROADCAST(T1)
*/T1.b1, sum(T1.a1) from T1 join T2 on T1.b1 = T2.b2 group by T1.b1]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
-+- LogicalProject(b1=[$1], a1=[$0])
- +- LogicalJoin(condition=[=($1, $3)], joinType=[inner],
joinHints=[[[BROADCAST inheritPath:[0, 0] options:[T1]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
- :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1])
-+- Exchange(distribution=[hash[b1]])
- +- Calc(select=[b1, a1])
- +- Join(joinType=[InnerJoin], where=[=(b1, b2)], select=[a1, b1, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
joinHints=[[[BROADCAST options:[LEFT]]]], stateTtlHints=[[[STATE_TTL
options:{LEFT=1d, RIGHT=2d}]]])
- :- Exchange(distribution=[hash[b1]])
- : +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
- +- Exchange(distribution=[hash[b2]])
- +- TableSourceScan(table=[[default_catalog, default_database, T2,
project=[b2], metadata=[]]], fields=[b2])
]]>
</Resource>
</TestCase>
@@ -202,6 +357,71 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1,
b1, a2, b2], leftInput
: +- TableSourceScan(table=[[default_catalog, default_database, T1]],
fields=[a1, b1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, T2]],
fields=[a2, b2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinStateTtlHintWithView">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'V4' = '1d') */* from T1 join
V4 on T1.a1 = V4.a4]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a4=[$2], b4=[$3])
++- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{T1=2d, V4=1d}]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ +- LogicalProject(a4=[$0], b4=[$1], hints=[[[ALIAS options:[V4]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, a3, b3],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=2d, RIGHT=1d}]]])
+:- Exchange(distribution=[hash[a1]])
+: +- TableSourceScan(table=[[default_catalog, default_database, T1]],
fields=[a1, b1])
++- Exchange(distribution=[hash[a3]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T3]],
fields=[a3, b3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiAggStateTtl">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d'), STATE_TTL('T1' = '8d') */
count(*) from T1 group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{T1=2d}][STATE_TTL inheritPath:[0] options:{T1=8d}]]])
+ +- LogicalProject(a1=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T1,
project=[a1], metadata=[]]], fields=[a1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSimpleAggStateTtl">
+ <Resource name="sql">
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d') */ count(*) from T1 group by
a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{T1=2d}]]])
+ +- LogicalProject(a1=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[2d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T1,
project=[a1], metadata=[]]], fields=[a1])
]]>
</Resource>
</TestCase>
@@ -227,55 +447,50 @@ Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1,
b1, a2, b2], leftInput
]]>
</Resource>
</TestCase>
- <TestCase name="testJoinStateTtlHintWithView">
+ <TestCase name="testSingleAggStateTtlWithMultiKV">
<Resource name="sql">
- <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'V4' = '1d') */* from T1 join
V4 on T1.a1 = V4.a4]]>
+ <![CDATA[select /*+ STATE_TTL('T1' = '2d', 'T1' = '8d') */ count(*) from
T1 group by a1]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a4=[$2], b4=[$3])
-+- LogicalJoin(condition=[=($0, $2)], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0] options:{T1=2d, V4=1d}]]])
- :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
- +- LogicalProject(a4=[$0], b4=[$1], hints=[[[ALIAS options:[V4]]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, T3]])
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], stateTtlHints=[[[STATE_TTL
inheritPath:[0] options:{T1=8d}]]])
+ +- LogicalProject(a1=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, a3, b3],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=2d, RIGHT=1d}]]])
-:- Exchange(distribution=[hash[a1]])
-: +- TableSourceScan(table=[[default_catalog, default_database, T1]],
fields=[a1, b1])
-+- Exchange(distribution=[hash[a3]])
- +- TableSourceScan(table=[[default_catalog, default_database, T3]],
fields=[a3, b3])
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[a1], select=[a1, COUNT(*) AS EXPR$0],
stateTtlHints=[[[STATE_TTL options:[8d]]]])
+ +- Exchange(distribution=[hash[a1]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T1,
project=[a1], metadata=[]]], fields=[a1])
]]>
</Resource>
</TestCase>
- <TestCase name="testDuplicateJoinStateTtlHint">
+ <TestCase name="testStateTtlHintWithJoinHint">
<Resource name="sql">
- <![CDATA[select /*+ STATE_TTL('T2' = '2d', 'T3' = '3d'), STATE_TTL('T1'
= '1d', 'T2' = '8d') */* from T1, T2, T3 where T1.a1 = T2.a2 and T2.b2 =
T3.b3]]>
+ <![CDATA[select /*+ STATE_TTL('T1' = '1d', 'T2' = '2d'), BROADCAST(T1)
*/T1.b1, sum(T1.a1) from T1 join T2 on T1.b1 = T2.b2 group by T1.b1]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5])
-+- LogicalFilter(condition=[AND(=($0, $2), =($3, $5))])
- +- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T2=2d, T3=3d}][STATE_TTL
inheritPath:[0, 0] options:{T1=1d, T2=8d}]]])
- :- LogicalJoin(condition=[true], joinType=[inner],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0, 0] options:{T2=2d,
T3=3d}][STATE_TTL inheritPath:[0, 0, 0] options:{T1=1d, T2=8d}]]])
- : :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
- +- LogicalTableScan(table=[[default_catalog, default_database, T3]],
hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], stateTtlHints=[[[STATE_TTL
options:{T1=1d, T2=2d}]]])
++- LogicalProject(b1=[$1], a1=[$0])
+ +- LogicalJoin(condition=[=($1, $3)], joinType=[inner],
joinHints=[[[BROADCAST inheritPath:[0, 0] options:[T1]]]],
stateTtlHints=[[[STATE_TTL inheritPath:[0, 0] options:{T1=1d, T2=2d}]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T1]],
hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]],
hints=[[[ALIAS inheritPath:[] options:[T2]]]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Join(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{RIGHT=3d}]]])
-:- Exchange(distribution=[hash[b2]])
-: +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
stateTtlHints=[[[STATE_TTL options:{LEFT=1d, RIGHT=2d}]]])
-: :- Exchange(distribution=[hash[a1]])
-: : +- TableSourceScan(table=[[default_catalog, default_database, T1]],
fields=[a1, b1])
-: +- Exchange(distribution=[hash[a2]])
-: +- TableSourceScan(table=[[default_catalog, default_database, T2]],
fields=[a2, b2])
-+- Exchange(distribution=[hash[b3]])
- +- TableSourceScan(table=[[default_catalog, default_database, T3]],
fields=[a3, b3])
+GroupAggregate(groupBy=[b1], select=[b1, SUM(a1) AS EXPR$1])
++- Exchange(distribution=[hash[b1]])
+ +- Calc(select=[b1, a1])
+ +- Join(joinType=[InnerJoin], where=[=(b1, b2)], select=[a1, b1, b2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey],
joinHints=[[[BROADCAST options:[LEFT]]]], stateTtlHints=[[[STATE_TTL
options:{LEFT=1d, RIGHT=2d}]]])
+ :- Exchange(distribution=[hash[b1]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
T1]], fields=[a1, b1])
+ +- Exchange(distribution=[hash[b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, T2,
project=[b2], metadata=[]]], fields=[b2])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
new file mode 100644
index 00000000000..cebe1e5898b
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/plan/agg-with-state-ttl-hint.json
@@ -0,0 +1,272 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 52,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 53,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "POSTFIX",
+ "internalName" : "$IS TRUE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` BOOLEAN NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[b, a, (a > 1) IS TRUE AS $f2, c])"
+ }, {
+ "id" : 54,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` BOOLEAN NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 55,
+ "type" : "stream-exec-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "false",
+ "table.exec.mini-batch.size" : "-1"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "cnt",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "name" : "avg_a",
+ "internalName" : "$AVG$1",
+ "argList" : [ 1 ],
+ "filterArg" : 2,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "INT"
+ }, {
+ "name" : "min_c",
+ "internalName" : "$MIN$1",
+ "argList" : [ 3 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "aggCallNeedRetractions" : [ false, false, false ],
+ "generateUpdateBefore" : false,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "4 d",
+ "name" : "groupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT NOT NULL, `avg_a` INT,
`min_c` VARCHAR(2147483647)>",
+ "description" : "GroupAggregate(groupBy=[b], select=[b, COUNT(*) AS cnt,
AVG(a) FILTER $f2 AS avg_a, MIN(c) AS min_c])"
+ }, {
+ "id" : 56,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ } ],
+ "type" : "DOUBLE"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT, `avg_a` DOUBLE, `min_c`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[b, CAST(cnt AS BIGINT) AS cnt, CAST(avg_a AS
DOUBLE) AS avg_a, min_c])"
+ }, {
+ "id" : 57,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT NOT NULL"
+ }, {
+ "name" : "cnt",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "avg_a",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "min_c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ],
+ "primaryKey" : {
+ "name" : "PK_b",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "b" ]
+ }
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER" ],
+ "inputUpsertKey" : [ 0 ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `cnt` BIGINT, `avg_a` DOUBLE, `min_c`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[b, cnt, avg_a, min_c])"
+ } ],
+ "edges" : [ {
+ "source" : 52,
+ "target" : 53,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 53,
+ "target" : 54,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 54,
+ "target" : 55,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 55,
+ "target" : 56,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 56,
+ "target" : 57,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
new file mode 100644
index 00000000000..50e4bee3a94
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-group-aggregate_1/agg-with-state-ttl-hint/savepoint/_metadata
differ