This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 54fa648 Revert "[FLINK-23544][table-planner] Window TVF Supports
session window in plan"
54fa648 is described below
commit 54fa64863f9e278ceddc8d294568e56350d9f545
Author: godfreyhe <[email protected]>
AuthorDate: Fri Sep 3 15:29:23 2021 +0800
Revert "[FLINK-23544][table-planner] Window TVF Supports session window in
plan"
This reverts commit 34d51002
Currently, the session window TVFs syntax is not correct. The syntax
mentioned in FLIP is depended on CALCITE-4337. We will re-introduce this
feature in FLINK-24024 after CALCITE-4337 is finished.
---
.../functions/sql/FlinkSqlOperatorTable.java | 1 -
.../functions/sql/SqlSessionTableFunction.java | 67 ------
.../functions/sql/SqlWindowTableFunction.java | 3 -
.../planner/plan/logical/SessionWindowSpec.java | 74 -------
.../table/planner/plan/logical/WindowSpec.java | 3 +-
.../exec/stream/StreamExecWindowTableFunction.java | 5 -
.../TwoStageOptimizedWindowAggregateRule.java | 7 -
.../StreamPhysicalLocalWindowAggregate.scala | 13 +-
.../stream/StreamPhysicalWindowAggregate.scala | 64 ++----
.../plan/rules/logical/SplitAggregateRule.scala | 10 +-
.../table/planner/plan/utils/WindowUtil.scala | 4 -
.../planner/plan/stream/sql/WindowRankTest.xml | 53 -----
.../plan/stream/sql/agg/WindowAggregateTest.xml | 236 ---------------------
.../plan/stream/sql/join/WindowJoinTest.xml | 139 ------------
.../planner/plan/stream/sql/WindowRankTest.scala | 79 +------
.../plan/stream/sql/agg/WindowAggregateTest.scala | 64 ------
.../plan/stream/sql/join/WindowJoinTest.scala | 106 ---------
.../runtime/stream/sql/WindowAggregateITCase.scala | 87 --------
.../window/WindowTableFunctionOperator.java | 3 -
19 files changed, 21 insertions(+), 997 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 877fff9..2c6d46e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1181,5 +1181,4 @@ public class FlinkSqlOperatorTable extends
ReflectiveSqlOperatorTable {
public static final SqlFunction TUMBLE = new SqlTumbleTableFunction();
public static final SqlFunction HOP = new SqlHopTableFunction();
public static final SqlFunction CUMULATE = new SqlCumulateTableFunction();
- public static final SqlFunction SESSION = new SqlSessionTableFunction();
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
deleted file mode 100644
index 629da14..0000000
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.sql;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperator;
-
-/**
- * SqlSessionTableFunction implements an operator for session.
- *
- * <p>It allows three parameters:
- *
- * <ol>
- * <li>a table
- * <li>a descriptor to provide a time attribute column name from the input
table
- * <li>an interval parameter to specify an inactive activity gap to break
sessions
- * </ol>
- */
-public class SqlSessionTableFunction extends SqlWindowTableFunction {
-
- public SqlSessionTableFunction() {
- super(SqlKind.SESSION.name(), new OperandMetadataImpl());
- }
-
- /** Operand type checker for SESSION. */
- private static class OperandMetadataImpl extends AbstractOperandMetadata {
- OperandMetadataImpl() {
- super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL,
PARAM_SESSION_GAP), 3);
- }
-
- @Override
- public boolean checkOperandTypes(SqlCallBinding callBinding, boolean
throwOnFailure) {
- if (!checkTableAndDescriptorOperands(callBinding, 1)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
- }
- if (!checkIntervalOperands(callBinding, 2)) {
- return throwValidationSignatureErrorOrReturnFalse(callBinding,
throwOnFailure);
- }
- // check time attribute
- return throwExceptionOrReturnFalse(
- checkTimeColumnDescriptorOperand(callBinding, 1),
throwOnFailure);
- }
-
- @Override
- public String getAllowedSignatures(SqlOperator op, String opName) {
- return opName + "(TABLE table_name, DESCRIPTOR(timecol), datetime
interval)";
- }
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
index 3c23dde..41926ff 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
@@ -83,9 +83,6 @@ public class SqlWindowTableFunction extends SqlFunction
implements SqlTableFunct
/** The slide interval, only used for HOP window. */
protected static final String PARAM_STEP = "STEP";
- /** The session gap interval, only used for SESSION window. */
- protected static final String PARAM_SESSION_GAP = "GAP";
-
/**
* Type-inference strategy whereby the row type of a table function call
is a ROW, which is
* combined from the row type of operand #0 (which is a TABLE) and two
additional fields. The
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
deleted file mode 100644
index 962e5ba..0000000
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.logical;
-
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-
-import java.time.Duration;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.TimeUtils.formatWithHighestUnit;
-
-/** Logical representation of a session window specification. */
-@JsonTypeName("SessionWindow")
-public class SessionWindowSpec implements WindowSpec {
- public static final String FIELD_NAME_GAP = "gap";
-
- @JsonProperty(FIELD_NAME_GAP)
- private final Duration gap;
-
- @JsonCreator
- public SessionWindowSpec(@JsonProperty(FIELD_NAME_GAP) Duration gap) {
- this.gap = checkNotNull(gap);
- }
-
- @Override
- public String toSummaryString(String windowing) {
- return String.format("SESSION(%s, gap=[%s])", windowing,
formatWithHighestUnit(gap));
- }
-
- public Duration getGap() {
- return gap;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SessionWindowSpec that = (SessionWindowSpec) o;
- return gap.equals(that.gap);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(SessionWindowSpec.class, gap);
- }
-
- @Override
- public String toString() {
- return String.format("SESSION(gap=[%s])", formatWithHighestUnit(gap));
- }
-}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
index fbde8a9..bb3c261 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowSpec.java
@@ -26,8 +26,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@JsonSubTypes({
@JsonSubTypes.Type(value = TumblingWindowSpec.class),
@JsonSubTypes.Type(value = HoppingWindowSpec.class),
- @JsonSubTypes.Type(value = CumulativeWindowSpec.class),
- @JsonSubTypes.Type(value = SessionWindowSpec.class)
+ @JsonSubTypes.Type(value = CumulativeWindowSpec.class)
})
public interface WindowSpec {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
index af0a6c5..5fd2025 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec;
import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec;
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec;
import org.apache.flink.table.planner.plan.logical.WindowSpec;
@@ -120,10 +119,6 @@ public class StreamExecWindowTableFunction extends
ExecNodeBase<RowData>
+ "3. join with join condition contains
window starts equality of input tables "
+ "and window ends equality of input
tables.\n",
windowSummary));
- } else if (windowingStrategy.getWindow() instanceof SessionWindowSpec)
{
- // WindowTableFunctionOperator is not suitable for Session Window
because can't do
- // state-less window assigning for input row per record for
Session Window.
- throw new TableException("Session Window TableFunction is not
supported yet.");
} else if (!windowingStrategy.isRowtime()) {
throw new TableException("Processing time Window TableFunction is
not supported yet.");
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
index b956729..8eeb79c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/TwoStageOptimizedWindowAggregateRule.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.plan.rules.physical.stream;
import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec;
import
org.apache.flink.table.planner.plan.logical.SliceAttachedWindowingStrategy;
import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
@@ -96,12 +95,6 @@ public class TwoStageOptimizedWindowAggregateRule extends
RelOptRule {
return false;
}
- // session window doesn't support two-phase,
- // otherwise window assigner results may be different
- if (windowing.getWindow() instanceof SessionWindowSpec) {
- return false;
- }
-
// all aggregate function should support merge() method
if
(!AggregateUtil.doAllSupportPartialMerge(windowAgg.aggInfoList().aggInfos())) {
return false;
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
index 7035e99..2823aab 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLocalWindowAggregate.scala
@@ -20,7 +20,7 @@ package
org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty,
PlannerSliceEnd, PlannerWindowReference}
-import org.apache.flink.table.planner.plan.logical.{SessionWindowSpec,
TimeAttributeWindowingStrategy, WindowAttachedWindowingStrategy,
WindowingStrategy}
+import
org.apache.flink.table.planner.plan.logical.{TimeAttributeWindowingStrategy,
WindowAttachedWindowingStrategy, WindowingStrategy}
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import
org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedWindowAggregateRule
@@ -69,17 +69,8 @@ class StreamPhysicalLocalWindowAggregate(
override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = {
windowing match {
- case _: WindowAttachedWindowingStrategy =>
+ case _: WindowAttachedWindowingStrategy | _:
TimeAttributeWindowingStrategy =>
// pass
- case tws: TimeAttributeWindowingStrategy =>
- tws.getWindow match {
- case _: SessionWindowSpec =>
- return litmus.fail("StreamPhysicalLocalWindowAggregate should not
accept " +
- "TimeAttributeWindowingStrategy with Session window. " +
- "This should never happen, please open an issue.")
- case _ =>
- // pass
- }
case _ =>
return litmus.fail("StreamPhysicalLocalWindowAggregate should only
accepts " +
"WindowAttachedWindowingStrategy and TimeAttributeWindowingStrategy,
" +
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
index 0837c1d..eaa70e2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalWindowAggregate.scala
@@ -18,23 +18,22 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
-import org.apache.flink.table.expressions.ApiExpressionUtils.intervalOfMillis
-import org.apache.flink.table.expressions.FieldReferenceExpression
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty,
PlannerWindowReference}
-import org.apache.flink.table.planner.plan.logical.{SessionGroupWindow,
SessionWindowSpec, TimeAttributeWindowingStrategy, WindowingStrategy}
-import
org.apache.flink.table.planner.plan.nodes.exec.stream.{StreamExecGroupWindowAggregate,
StreamExecWindowAggregate}
+import org.apache.flink.table.planner.expressions.{PlannerNamedWindowProperty,
PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd,
PlannerWindowStart}
+import org.apache.flink.table.planner.plan.logical.WindowingStrategy
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.utils.{AggregateInfoList,
AggregateUtil, FlinkRelOptUtil, RelExplainUtil, WindowUtil}
import
org.apache.flink.table.planner.plan.utils.WindowUtil.checkEmitConfiguration
-import
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType
-
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.util.ImmutableBitSet
import java.util
+import java.util.Collections
import scala.collection.JavaConverters._
@@ -105,45 +104,14 @@ class StreamPhysicalWindowAggregate(
override def translateToExecNode(): ExecNode[_] = {
checkEmitConfiguration(FlinkRelOptUtil.getTableConfigFromContext(this))
- windowing.getWindow match {
- case windowSpec: SessionWindowSpec =>
- windowing match {
- case timeWindowStrategy: TimeAttributeWindowingStrategy =>
- val timeAttributeFieldName = getInput.getRowType.getFieldNames.get(
- timeWindowStrategy.getTimeAttributeIndex)
- val timeAttributeType = windowing.getTimeAttributeType
- val logicalWindow = SessionGroupWindow(
- new PlannerWindowReference("w$", timeAttributeType),
- new FieldReferenceExpression(
- timeAttributeFieldName,
- fromLogicalTypeToDataType(timeAttributeType),
- 0,
- timeWindowStrategy.getTimeAttributeIndex),
- intervalOfMillis(windowSpec.getGap.toMillis)
- )
- new StreamExecGroupWindowAggregate(
- grouping,
- aggCalls.toArray,
- logicalWindow,
- namedWindowProperties.toArray,
- false,
- InputProperty.DEFAULT,
- FlinkTypeFactory.toLogicalRowType(getRowType),
- getRelDetailedDescription
- )
- case _ =>
- throw new UnsupportedOperationException(s"$windowing is not
supported yet.")
- }
- case _ =>
- new StreamExecWindowAggregate(
- grouping,
- aggCalls.toArray,
- windowing,
- namedWindowProperties.toArray,
- InputProperty.DEFAULT,
- FlinkTypeFactory.toLogicalRowType(getRowType),
- getRelDetailedDescription
- )
- }
+ new StreamExecWindowAggregate(
+ grouping,
+ aggCalls.toArray,
+ windowing,
+ namedWindowProperties.toArray,
+ InputProperty.DEFAULT,
+ FlinkTypeFactory.toLogicalRowType(getRowType),
+ getRelDetailedDescription
+ )
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
index bbe710d..31d1f25 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitAggregateRule.scala
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.calcite.{FlinkContext,
FlinkLogicalRelFactories, FlinkRelBuilder}
import org.apache.flink.table.planner.functions.sql.{FlinkSqlOperatorTable,
SqlFirstLastValueAggFunction}
-import org.apache.flink.table.planner.plan.logical.SessionWindowSpec
import org.apache.flink.table.planner.plan.PartialFinalType
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
@@ -139,18 +138,11 @@ class SplitAggregateRule extends RelOptRule(
val windowProps = fmq.getRelWindowProperties(agg.getInput)
val isWindowAgg =
WindowUtil.groupingContainsWindowStartEnd(agg.getGroupSet, windowProps)
val isProctimeWindowAgg = isWindowAgg && !windowProps.isRowtime
-
- // disable distinct split for session window,
- // otherwise window assigner results may be different
- val isSessionWindowAgg = isWindowAgg &&
- windowProps.getWindowSpec.isInstanceOf[SessionWindowSpec]
-
// TableAggregate is not supported. see also FLINK-21923.
val isTableAgg = AggregateUtil.isTableAggregate(agg.getAggCallList)
agg.partialFinalType == PartialFinalType.NONE &&
agg.containsDistinctCall() &&
- splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg &&
- !isTableAgg && !isSessionWindowAgg
+ splitDistinctAggEnabled && isAllAggSplittable && !isProctimeWindowAgg &&
!isTableAgg
}
override def onMatch(call: RelOptRuleCall): Unit = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index edd8f0e..37056b8 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -218,10 +218,6 @@ object WindowUtil {
val step = getOperandAsLong(windowCall.operands(2))
val maxSize = getOperandAsLong(windowCall.operands(3))
new CumulativeWindowSpec(Duration.ofMillis(maxSize),
Duration.ofMillis(step), offset)
-
- case FlinkSqlOperatorTable.SESSION =>
- val gap = getOperandAsLong(windowCall.operands(2))
- new SessionWindowSpec(Duration.ofMillis(gap))
}
new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType,
timeIndex)
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
index c3061ad..0b7a643 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
@@ -128,59 +128,6 @@ Calc(select=[window_start, window_end, window_time, a,
cnt, sum_d, max_d, wAvg,
]]>
</Resource>
</TestCase>
- <TestCase name="testOnSessionWindowAggregate">
- <Resource name="sql">
- <![CDATA[
-SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d, wAvg, uv
-FROM (
-SELECT *,
- ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY cnt DESC)
as rownum
-FROM (
- SELECT
- a,
- window_start,
- window_end,
- window_time,
- count(*) as cnt,
- sum(d) as sum_d,
- max(d) filter (where b > 1000) as max_d,
- weightedAvg(b, e) AS wAvg,
- count(distinct c) AS uv
- FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
- GROUP BY a, window_start, window_end, window_time
- )
-)
-WHERE rownum <= 3
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(window_start=[$1], window_end=[$2], window_time=[$3], a=[$0],
cnt=[$4], sum_d=[$5], max_d=[$6], wAvg=[$7], uv=[$8])
-+- LogicalFilter(condition=[<=($9, 3)])
- +- LogicalProject(a=[$0], window_start=[$1], window_end=[$2],
window_time=[$3], cnt=[$4], sum_d=[$5], max_d=[$6], wAvg=[$7], uv=[$8],
rownum=[ROW_NUMBER() OVER (PARTITION BY $1, $2 ORDER BY $4 DESC NULLS LAST)])
- +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()],
sum_d=[SUM($4)], max_d=[MAX($4) FILTER $5], wAvg=[weightedAvg($6, $7)],
uv=[COUNT(DISTINCT $8)])
- +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8],
window_time=[$9], d=[$3], $f5=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6,
DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[window_start, window_end, window_time, a, cnt, sum_d, max_d,
wAvg, uv])
-+- WindowRank(window=[SESSION(win_start=[window_start], win_end=[window_end],
gap=[15 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3],
partitionBy=[], orderBy=[cnt DESC], select=[a, window_start, window_end,
window_time, cnt, sum_d, max_d, wAvg, uv])
- +- Exchange(distribution=[single])
- +- Calc(select=[a, window_start, window_end, window_time, cnt, sum_d,
max_d, wAvg, uv])
- +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime],
gap=[15 min])], select=[a, COUNT(*) AS cnt, SUM(d) AS sum_d, MAX(d) FILTER $f5
AS max_d, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f5, b, e, c,
rowtime])
- +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
- +- TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testOnTumbleWindowAggregate">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 4903153f8..5141a01 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -1583,242 +1583,6 @@ Sink(table=[default_catalog.default_database.s1],
fields=[window_start, window_e
]]>
</Resource>
</TestCase>
- <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSession_OnProctime[aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- weightedAvg(b, e) AS wAvg,
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- weightedAvg(b, e) AS wAvg,
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($6),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- weightedAvg(b, e) AS wAvg,
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- a,
- window_start,
- window_end,
- count(*),
- sum(d),
- max(d) filter (where b > 1000),
- weightedAvg(b, e) AS wAvg,
- count(distinct c) AS uv
-FROM TABLE(
- SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
-GROUP BY a, window_start, window_end
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
-+- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($6, DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
-+- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS
EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testTumble_CalcOnTVF[aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index c57ba95..5991c45 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -1163,145 +1163,6 @@ Calc(select=[a, window_start, window_end,
PROCTIME_MATERIALIZE(window_time) AS w
]]>
</Resource>
</TestCase>
- <TestCase name="testOnSessionWindowAggregate">
- <Resource name="sql">
- <![CDATA[
-SELECT L.*, R.*
-FROM (
- SELECT
- a,
- window_start,
- window_end,
- window_time,
- count(*) as cnt,
- count(distinct c) AS uv
- FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
- GROUP BY a, window_start, window_end, window_time
-) L
-JOIN (
- SELECT
- a,
- window_start,
- window_end,
- window_time,
- count(*) as cnt,
- count(distinct c) AS uv
- FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
- GROUP BY a, window_start, window_end, window_time
-) R
-ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a =
R.a
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3],
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8],
window_time0=[$9], cnt0=[$10], uv0=[$11])
-+- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))],
joinType=[inner])
- :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
- : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
- : +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($3),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
- : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
- : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
- : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
- +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
- +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($3),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME*
window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable2]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-WindowJoin(leftWindow=[SESSION(win_start=[window_start], win_end=[window_end],
gap=[15 min])], rightWindow=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[15 min])], joinType=[InnerJoin], where=[=(a, a0)],
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0,
window_end0, window_time0, cnt0, uv0])
-:- Exchange(distribution=[hash[a]])
-: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
-: +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime],
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$)
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
-: +- Exchange(distribution=[hash[a]])
-: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
-: +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
-+- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
- +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime],
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$)
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
- +- Exchange(distribution=[hash[a]])
- +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testOnSessionWindowAggregateOnProctime">
- <Resource name="sql">
- <![CDATA[
-SELECT L.*, R.*
-FROM (
- SELECT
- a,
- window_start,
- window_end,
- window_time,
- count(*) as cnt,
- count(distinct c) AS uv
- FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15'
MINUTE))
- GROUP BY a, window_start, window_end, window_time
-) L
-JOIN (
- SELECT
- a,
- window_start,
- window_end,
- window_time,
- count(*) as cnt,
- count(distinct c) AS uv
- FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15'
MINUTE))
- GROUP BY a, window_start, window_end, window_time
-) R
-ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a =
R.a
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3],
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8],
window_time0=[$9], cnt0=[$10], uv0=[$11])
-+- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))],
joinType=[inner])
- :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
- : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
- : +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($4),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
- : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
- : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
- : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
- : +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
- +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT
$4)])
- +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6],
window_time=[$7], c=[$2])
- +- LogicalTableFunctionScan(invocation=[SESSION($4, DESCRIPTOR($4),
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b,
BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime,
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME*
window_time)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[$4])
- +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3,
1000:INTERVAL SECOND)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3],
proctime=[PROCTIME()])
- +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable2]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS
window_time, cnt, uv, a0, window_start0, window_end0,
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
-+- WindowJoin(leftWindow=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[15 min])],
rightWindow=[SESSION(win_start=[window_start], win_end=[window_end], gap=[15
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start,
window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0,
cnt0, uv0])
- :- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
- : +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime],
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$)
AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
- : +- Exchange(distribution=[hash[a]])
- : +- Calc(select=[a, c, proctime])
- : +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
- : +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
- : +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
- +- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime],
gap=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$)
AS window_start, end('w$) AS window_end, proctime('w$) AS window_time])
- +- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, c, proctime])
- +- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
- +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime])
- +- TableSourceScan(table=[[default_catalog,
default_database, MyTable2, project=[a, c, rowtime], metadata=[]]], fields=[a,
c, rowtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testOnTumbleWindowAggregate">
<Resource name="sql">
<![CDATA[
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
index fae3d5c..b11a7f5 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.scala
@@ -82,25 +82,7 @@ class WindowRankTest extends TableTestBase {
}
@Test
- def testUnsupportedWindowTVF_Session(): Unit = {
- val sql =
- """
- |SELECT window_start, window_end, window_time, a, b, c, d, e
- |FROM (
- |SELECT *,
- | ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER
BY b DESC) as rownum
- |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5'
MINUTE))
- |)
- |WHERE rownum <= 3
- """.stripMargin
-
- thrown.expectMessage("Session Window TableFunction is not supported yet.")
- thrown.expect(classOf[TableException])
- util.verifyExplain(sql)
- }
-
- @Test
- def testUnsupportedWindowTVF_Proctime(): Unit = {
+ def testUnsupportedWindowTVF_TumbleOnProctime(): Unit = {
val sql =
"""
|SELECT window_start, window_end, window_time, a, b, c, d, e
@@ -408,65 +390,6 @@ class WindowRankTest extends TableTestBase {
util.verifyExplain(sql)
}
- @Test
- def testOnSessionWindowAggregate(): Unit = {
- val sql =
- """
- |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d,
wAvg, uv
- |FROM (
- |SELECT *,
- | ROW_NUMBER() OVER(PARTITION BY window_start, window_end ORDER BY
cnt DESC) as rownum
- |FROM (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | sum(d) as sum_d,
- | max(d) filter (where b > 1000) as max_d,
- | weightedAvg(b, e) AS wAvg,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- | )
- |)
- |WHERE rownum <= 3
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
- def testOnSessionWindowAggregateOnProctime(): Unit = {
- val sql =
- """
- |SELECT window_start, window_end, window_time, a, cnt, sum_d, max_d,
wAvg, uv
- |FROM (
- |SELECT *,
- | ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end ORDER
BY cnt DESC) as rownum
- |FROM (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | sum(d) as sum_d,
- | max(d) filter (where b > 1000) as max_d,
- | weightedAvg(b, e) AS wAvg,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- | )
- |)
- |WHERE rownum <= 3
- """.stripMargin
-
- thrown.expect(classOf[TableException])
- thrown.expectMessage("Processing time Window TopN is not supported yet.")
- util.verifyExplain(sql)
- }
-
//
----------------------------------------------------------------------------------------
// Tests for queries window rank could propagate time attribute
//
----------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index c3d1a25..96d3988 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -425,70 +425,6 @@ class WindowAggregateTest(aggPhaseEnforcer:
AggregatePhaseStrategy) extends Tabl
}
@Test
- def testSession_OnRowtime(): Unit = {
- // Session window does not two-phase optimization
- val sql =
- """
- |SELECT
- | a,
- | window_start,
- | window_end,
- | count(*),
- | sum(d),
- | max(d) filter (where b > 1000),
- | weightedAvg(b, e) AS wAvg,
- | count(distinct c) AS uv
- |FROM TABLE(
- | SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
- |GROUP BY a, window_start, window_end
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
- def testSession_OnProctime(): Unit = {
- // Session window does not two-phase optimization
- val sql =
- """
- |SELECT
- | a,
- | window_start,
- | window_end,
- | count(*),
- | sum(d),
- | max(d) filter (where b > 1000),
- | weightedAvg(b, e) AS wAvg,
- | count(distinct c) AS uv
- |FROM TABLE(
- | SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
- |GROUP BY a, window_start, window_end
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
- def testSession_DistinctSplitEnabled(): Unit = {
- // Session window does not split-distinct optimization
- util.tableEnv.getConfig.getConfiguration.setBoolean(
- OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
- val sql =
- """
- |SELECT
- | a,
- | window_start,
- | window_end,
- | count(*),
- | sum(d),
- | max(d) filter (where b > 1000),
- | count(distinct c) AS uv
- |FROM TABLE(
- | SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE))
- |GROUP BY a, window_start, window_end
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
def testCumulate_DistinctSplitEnabled(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
index df1931d..1c9b845 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.scala
@@ -372,48 +372,6 @@ class WindowJoinTest extends TableTestBase {
util.verifyExplain(sql)
}
- @Test
- def testUnsupportedWindowTVF_SessionOnRowtime(): Unit = {
- val sql =
- """
- |SELECT *
- |FROM (
- | SELECT *
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
- |) L
- |JOIN (
- | SELECT *
- | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
- |) R
- |ON L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a
- """.stripMargin
-
- thrown.expectMessage("Session Window TableFunction is not supported yet.")
- thrown.expect(classOf[TableException])
- util.verifyExplain(sql)
- }
-
- @Test
- def testUnsupportedWindowTVF_SessionOnProctime(): Unit = {
- val sql =
- """
- |SELECT L.a, L.b, L.c, R.a, R.b, R.c
- |FROM (
- | SELECT *
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL
'15' MINUTE))
- |) L
- |JOIN (
- | SELECT *
- | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL
'15' MINUTE))
- |) R
- |ON L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a
- """.stripMargin
-
- thrown.expectMessage("Processing time Window Join is not supported yet.")
- thrown.expect(classOf[TableException])
- util.verifyExplain(sql)
- }
-
//
----------------------------------------------------------------------------------------
// Tests for invalid queries Join on window Aggregate
// because left window strategy is not equals to right window strategy.
@@ -945,70 +903,6 @@ class WindowJoinTest extends TableTestBase {
}
@Test
- def testOnSessionWindowAggregate(): Unit = {
- val sql =
- """
- |SELECT L.*, R.*
- |FROM (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- |) L
- |JOIN (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- |) R
- |ON L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
- def testOnSessionWindowAggregateOnProctime(): Unit = {
- val sql =
- """
- |SELECT L.*, R.*
- |FROM (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- |) L
- |JOIN (
- | SELECT
- | a,
- | window_start,
- | window_end,
- | window_time,
- | count(*) as cnt,
- | count(distinct c) AS uv
- | FROM TABLE(SESSION(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL
'15' MINUTE))
- | GROUP BY a, window_start, window_end, window_time
- |) R
- |ON L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a
- """.stripMargin
- util.verifyRelPlan(sql)
- }
-
- @Test
def testWindowJoinWithNonEqui(): Unit = {
val sql =
"""
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
index b02258f..3e79333 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala._
-import org.apache.flink.table.api._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.OptimizerConfigOptions
@@ -28,7 +27,6 @@ import
org.apache.flink.table.planner.factories.TestValuesTableFactory
import
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
ROCKSDB_BACKEND, StateBackendMode}
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource,
StreamingWithStateTestBase, TestData, TestingAppendSink}
-import
org.apache.flink.table.planner.runtime.utils.TimeTestUtil.TimestampAndWatermarkWithOffset
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy._
import org.apache.flink.types.Row
@@ -902,91 +900,6 @@ class WindowAggregateITCase(
CumulateWindowRollupExpectedData.sorted.mkString("\n"),
sink.getAppendResults.sorted.mkString("\n"))
}
-
- @Test
- def testEventTimeSessionWindow(): Unit = {
- //To verify the "merge" functionality, we create this test with the
following characteristics:
- // 1. set the Parallelism to 1, and have the test data out of order
- // 2. create a waterMark with 10ms offset to delay the window emission by
10ms
- val sessionData = List(
- (1L, 1, "Hello", "a"),
- (2L, 2, "Hello", "b"),
- (8L, 8, "Hello", "a"),
- (9L, 9, "Hello World", "b"),
- (4L, 4, "Hello", "c"),
- (16L, 16, "Hello", "d"))
-
- val stream = failingDataSource(sessionData)
- .assignTimestampsAndWatermarks(
- new TimestampAndWatermarkWithOffset[(Long, Int, String, String)](10L))
- val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'string, 'name)
- tEnv.registerTable("T", table)
-
- val sql =
- """
- |SELECT
- | `string`,
- | window_start,
- | window_time,
- | COUNT(1),
- | SUM(1),
- | COUNT(`int`),
- | SUM(`int`),
- | COUNT(DISTINCT name)
- |FROM TABLE(
- | SESSION(TABLE T, DESCRIPTOR(rowtime), INTERVAL '0.005' SECOND))
- |GROUP BY `string`, window_start, window_end, window_time
- """.stripMargin
-
- val sink = new TestingAppendSink
- tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
- env.execute()
-
- val expected = Seq(
- "Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1",
- "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1",
- "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3")
- assertEquals(expected.sorted, sink.getAppendResults.sorted)
- }
-
- @Test
- def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
- // create a watermark with 10ms offset to delay the window emission by
10ms to verify merge
- val sessionWindowTestData = List(
- (1L, 2, "Hello"), // (1, Hello) - window
- (2L, 2, "Hello"), // (1, Hello) - window, deduped
- (8L, 2, "Hello"), // (2, Hello) - window, deduped during
merge
- (10L, 3, "Hello"), // (2, Hello) - window, forwarded during
merge
- (9L, 9, "Hello World"), // (1, Hello World) - window
- (4L, 1, "Hello"), // (1, Hello) - window, triggering merge
- (16L, 16, "Hello")) // (3, Hello) - window (not merged)
-
- val stream = failingDataSource(sessionWindowTestData)
- .assignTimestampsAndWatermarks(new
TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
- val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
- tEnv.registerTable("MyTable", table)
-
- val sqlQuery =
- """
- |SELECT c,
- | COUNT(DISTINCT b),
- | window_end
- |FROM TABLE(
- | SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0.005'
SECOND))
- |GROUP BY c, window_start, window_end
- """.stripMargin
- val sink = new TestingAppendSink
- tEnv.sqlQuery(sqlQuery).toAppendStream[Row].addSink(sink)
- env.execute()
-
- val expected = Seq(
- "Hello World,1,1970-01-01T00:00:00.014", // window starts at [9L] till
{14L}
- "Hello,1,1970-01-01T00:00:00.021", // window starts at [16L] till
{21L}, not merged
- "Hello,3,1970-01-01T00:00:00.015" // window starts at [1L,2L],
- // merged with [8L,10L], by [4L], till {15L}
- )
- assertEquals(expected.sorted, sink.getAppendResults.sorted)
- }
}
object WindowAggregateITCase {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
index 51dc68e..22911d7 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowTableFunctionOperator.java
@@ -43,9 +43,6 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
*
* <p>Note: The operator only works for row-time.
*
- * <p>Note: The operator is not suitable for Session Window because can't do
state-less window
- * assigning for input row per record for Session Window.
- *
* <p>Note: The operator emits per record instead of at the end of window.
*/
public class WindowTableFunctionOperator extends TableStreamOperator<RowData>