LadyForest commented on code in PR #24179:
URL: https://github.com/apache/flink/pull/24179#discussion_r1465877429
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java:
##########
@@ -83,21 +85,50 @@ final List<RelNode> resolve(List<RelNode> roots) {
}
@Override
- protected RelNode visitBiRel(BiRel biRel) {
- Optional<String> leftName = extractAliasOrTableName(biRel.getLeft());
- Optional<String> rightName = extractAliasOrTableName(biRel.getRight());
-
- Set<RelHint> existentKVHints = new HashSet<>();
-
- List<RelHint> oldHints = ((Hintable) biRel).getHints();
+ protected RelNode doVisit(RelNode node) {
+ List<RelHint> oldHints = ((Hintable) node).getHints();
List<RelHint> oldQueryHints = FlinkHints.getAllQueryHints(oldHints);
// has no hints, return directly.
if (oldQueryHints.isEmpty()) {
- return super.visitChildren(biRel);
+ return super.visitChildren(node);
}
- List<RelHint> newHints = new ArrayList<>();
+ final List<RelHint> newHints;
+ if (node instanceof BiRel) {
+ BiRel biRel = (BiRel) node;
+ Optional<String> leftName =
extractAliasOrTableName(biRel.getLeft());
+ Optional<String> rightName =
extractAliasOrTableName(biRel.getRight());
+ newHints = validateAndGetNewHintsFromBiRel(leftName, rightName,
oldHints);
Review Comment:
Nit: I think `validateAndGetNewHints` is good enough, and it's ok to have an
overloaded method here. Another point is I don't think passing `Optional<T>` as
arguments is a good practice.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java:
##########
@@ -52,10 +52,13 @@ public abstract class FlinkHints {
// ~ Internal alias tag hint
public static final String HINT_ALIAS = "ALIAS";
- // ~ Option name for hints on join or correlate
+ // ~ Option name for hints on BiRel like join or correlate
public static final String LEFT_INPUT = "LEFT";
public static final String RIGHT_INPUT = "RIGHT";
+ // ~ Option name for hints on SingleRel like aggregate
+ public static final String INPUT = "INPUT";
Review Comment:
Is it more natural to convert the KV options to list options for agg?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java:
##########
@@ -122,8 +122,9 @@ public static HintStrategyTable createHintStrategyTable() {
.build())
.hintStrategy(
StateTtlHint.STATE_TTL.getHintName(),
- // TODO support agg state ttl hint
- HintStrategy.builder(HintPredicates.JOIN)
+ HintStrategy.builder(
Review Comment:
Nit: this is a comment on `STATE_TTL_NON_EMPTY_KV_OPTION_CHECKER`.
We'd better keep a unified error message template with other hints.
Suggested changes
```java
litmus.check(
ttlHint.kvOptions.size() > 0,
"Invalid STATE_TTL hint, expecting at least one
key-value options specified.");
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java:
##########
@@ -87,4 +91,28 @@ public static Map<Integer, Long>
getStateTtlFromHint(List<RelHint> hints) {
return stateTtlFromHint;
}
+
+ /**
+ * Get the state ttl from hints on the {@link
org.apache.calcite.rel.SingleRel} such as
+ * Aggregate.
+ *
+ * @return the state ttl in milliseconds. If no state ttl hints set from
hint, return "null".
+ */
+ @Nullable
+ public static Long getStateTtlFromHintOnSingleRel(List<RelHint> hints) {
+ AtomicReference<Long> stateTtlFromHint = new AtomicReference<>(null);
+ hints.stream()
+ .filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
+ .forEach(
+ hint ->
+ hint.kvOptions.forEach(
+ (input, ttl) -> {
+ if
(FlinkHints.INPUT.equals(input)) {
+ stateTtlFromHint.set(
+
TimeUtils.parseDuration(ttl).toMillis());
+ }
+ }));
+
+ return stateTtlFromHint.get();
Review Comment:
```suggestion
return hints.stream()
.filter(hint -> StateTtlHint.isStateTtlHint(hint.hintName))
.flatMap(hint -> hint.listOptions.stream())
.map(ttl -> TimeUtils.parseDuration(ttl).toMillis())
.collect(Collectors.toList())
.get(0);
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java:
##########
@@ -228,6 +290,19 @@ && matchIdentifier(input, rightName.get())) {
return newOptions;
}
+ private Map<String, String> getNewStateTtlHintOptions(
+ Optional<String> inputName, Map<String, String> kvOptions, String
hintName) {
+ updateInfoForOptionCheck(hintName, inputName);
+ Map<String, String> newOptions = new HashMap<>();
+ kvOptions.forEach(
+ (input, ttl) -> {
+ if (inputName.isPresent() && matchIdentifier(input,
inputName.get())) {
+ newOptions.put(FlinkHints.INPUT, ttl);
+ }
+ });
+ return newOptions;
Review Comment:
How about to return list options?
```suggestion
Preconditions.checkArgument(kvOptions.size() == 1);
updateInfoForOptionCheck(hintName, inputName);
return kvOptions.entrySet().stream()
.filter(
entry ->
inputName.isPresent()
&& matchIdentifier(entry.getKey(),
inputName.get()))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala:
##########
@@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate(
generateUpdateBefore,
needRetraction,
indexOfCountStar.map(Integer.valueOf).orNull,
+ StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
Review Comment:
This is commented on `explainTerms`.
Before `STATE_TTL` hint is introduced, the rest query hints(mainly join
hints for batch and lookup join hints for stream) are converted and use `itemIf`
So, I rethink the explanation format. How about
```scala
// for agg
.itemIf(
"stateTtl",
StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
hints.stream().anyMatch(hint =>
StateTtlHint.isStateTtlHint(hint.hintName)))
// which yields
// stateTtl=[1d]
// for regular join
val ttlHint = StateTtlHint.getStateTtlFromHintOnBiRel(getHints)
.itemIf("leftStateTtl", ttlHint.get(0), ttlHint.containsKey(0))
.itemIf("rightStateTtl", ttlHint.get(1), ttlHint.containsKey(1))
// which yields
// leftStateTtl=[1d], rightStateTtl=[3d]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]