This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new ea2bddc Revert "[FLINK-23434][table-planner-blink] Fix the
inconsistent type in IncrementalAggregateRule when the query has one distinct
agg function and count star agg function"
ea2bddc is described below
commit ea2bddc8e82ab4b9797f40534b1e5637eef4c5b5
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 23 17:04:51 2021 +0200
Revert "[FLINK-23434][table-planner-blink] Fix the inconsistent type in
IncrementalAggregateRule when the query has one distinct agg function and count
star agg function"
This reverts commit c8ad0ec1e5ad21da6b7663944d99f7e413aae0c9.
---
.../stream/StreamExecGlobalGroupAggregate.java | 17 -
.../StreamPhysicalGlobalGroupAggregate.scala | 19 +-
.../physical/stream/IncrementalAggregateRule.scala | 21 +-
.../table/planner/plan/utils/AggregateUtil.scala | 33 -
.../stream/IncrementalAggregateJsonPlanTest.java | 21 -
...lAggregateWithSumCountDistinctAndRetraction.out | 745 ---------------------
.../plan/stream/sql/agg/DistinctAggregateTest.xml | 179 +----
.../stream/sql/agg/IncrementalAggregateTest.xml | 41 +-
.../stream/sql/agg/DistinctAggregateTest.scala | 16 -
.../runtime/stream/sql/SplitAggregateITCase.scala | 52 --
10 files changed, 30 insertions(+), 1114 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 67ccfab..e6062c0 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -50,7 +50,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -60,12 +59,9 @@ import org.apache.calcite.tools.RelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,7 +71,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
private static final Logger LOG =
LoggerFactory.getLogger(StreamExecGlobalGroupAggregate.class);
public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE =
"localAggInputRowType";
- public static final String FIELD_NAME_INDEX_OF_COUNT_STAR =
"indexOfCountStar";
@JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
@@ -101,11 +96,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
@JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;
- /** The position for the existing count star. */
- @JsonProperty(FIELD_NAME_INDEX_OF_COUNT_STAR)
- @JsonInclude(JsonInclude.Include.NON_NULL)
- protected final Integer indexOfCountStar;
-
public StreamExecGlobalGroupAggregate(
int[] grouping,
AggregateCall[] aggCalls,
@@ -113,7 +103,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
RowType localAggInputRowType,
boolean generateUpdateBefore,
boolean needRetraction,
- @Nullable Integer indexOfCountStar,
InputProperty inputProperty,
RowType outputType,
String description) {
@@ -124,7 +113,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
localAggInputRowType,
generateUpdateBefore,
needRetraction,
- indexOfCountStar,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
@@ -139,7 +127,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
@JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType
localAggInputRowType,
@JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean
generateUpdateBefore,
@JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
- @JsonProperty(FIELD_NAME_INDEX_OF_COUNT_STAR) @Nullable Integer
indexOfCountStar,
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@@ -152,8 +139,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
this.localAggInputRowType = checkNotNull(localAggInputRowType);
this.generateUpdateBefore = generateUpdateBefore;
this.needRetraction = needRetraction;
- checkArgument(indexOfCountStar == null || indexOfCountStar >= 0 &&
needRetraction);
- this.indexOfCountStar = indexOfCountStar;
}
@SuppressWarnings("unchecked")
@@ -179,7 +164,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
aggCallNeedRetractions,
needRetraction,
-
JavaScalaConversionUtil.toScala(Optional.ofNullable(indexOfCountStar)),
false, // isStateBackendDataViews
true); // needDistinctInfo
final AggregateInfoList globalAggInfoList =
@@ -188,7 +172,6 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
aggCallNeedRetractions,
needRetraction,
-
JavaScalaConversionUtil.toScala(Optional.ofNullable(indexOfCountStar)),
true, // isStateBackendDataViews
true); // needDistinctInfo
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
index 2e4dbb0..b0ef077 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
@@ -43,30 +43,22 @@ class StreamPhysicalGlobalGroupAggregate(
val aggCallNeedRetractions: Array[Boolean],
val localAggInputRowType: RelDataType,
val needRetraction: Boolean,
- val partialFinalType: PartialFinalType,
- indexOfCountStar: Option[Int] = Option.empty)
+ val partialFinalType: PartialFinalType)
extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel) {
- // if the indexOfCountStar is valid, the needRetraction should be true
- require(indexOfCountStar.isEmpty || indexOfCountStar.get >= 0 &&
needRetraction)
-
lazy val localAggInfoList: AggregateInfoList =
AggregateUtil.transformToStreamAggregateInfoList(
FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
aggCalls,
aggCallNeedRetractions,
needRetraction,
- indexOfCountStar,
- isStateBackendDataViews = false,
- needDistinctInfo = true)
+ isStateBackendDataViews = false)
lazy val globalAggInfoList: AggregateInfoList =
AggregateUtil.transformToStreamAggregateInfoList(
FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
aggCalls,
aggCallNeedRetractions,
needRetraction,
- indexOfCountStar,
- isStateBackendDataViews = true,
- needDistinctInfo = true)
+ isStateBackendDataViews = true)
override def requireWatermark: Boolean = false
@@ -83,8 +75,7 @@ class StreamPhysicalGlobalGroupAggregate(
aggCallNeedRetractions,
localAggInputRowType,
needRetraction,
- partialFinalType,
- indexOfCountStar)
+ partialFinalType)
}
override def explainTerms(pw: RelWriter): RelWriter = {
@@ -98,7 +89,6 @@ class StreamPhysicalGlobalGroupAggregate(
globalAggInfoList,
grouping,
isGlobal = true))
- .itemIf("indexOfCountStar", indexOfCountStar.getOrElse(-1),
indexOfCountStar.nonEmpty)
}
override def translateToExecNode(): ExecNode[_] = {
@@ -110,7 +100,6 @@ class StreamPhysicalGlobalGroupAggregate(
FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
generateUpdateBefore,
needRetraction,
- indexOfCountStar.map(Integer.valueOf).orNull,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
getRelDetailedDescription
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
index 4c579a2..235c441 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
@@ -102,18 +102,17 @@ class IncrementalAggregateRule
false))
finalGlobalAgg.copy(finalGlobalAgg.getTraitSet,
Collections.singletonList(newExchange))
} else {
- // adapt the needRetract of final global agg to be same as that of
partial agg
+ // an additional count1 is inserted, need to adapt the global agg
val localAggInfoList = AggregateUtil.transformToStreamAggregateInfoList(
// the final agg input is partial agg
FlinkTypeFactory.toLogicalRowType(partialGlobalAgg.getRowType),
finalRealAggCalls,
- // use partial global agg's aggCallNeedRetractions
- partialGlobalAgg.aggCallNeedRetractions,
- partialGlobalAgg.needRetraction,
- partialGlobalAgg.globalAggInfoList.indexOfCountStar,
+ // all the aggs do not need retraction
+ Array.fill(finalRealAggCalls.length)(false),
+ // also do not need count*
+ needInputCount = false,
// the local agg is not works on state
- isStateBackendDataViews = false,
- needDistinctInfo = true)
+ isStateBackendDataViews = false)
// check whether the global agg required input row type equals the incr
agg output row type
val globalAggInputAccType = AggregateUtil.inferLocalAggRowType(
@@ -134,11 +133,11 @@ class IncrementalAggregateRule
finalGlobalAgg.getRowType,
finalGlobalAgg.grouping,
finalRealAggCalls,
- partialGlobalAgg.aggCallNeedRetractions,
+ // all the aggs do not need retraction
+ Array.fill(finalRealAggCalls.length)(false),
finalGlobalAgg.localAggInputRowType,
- partialGlobalAgg.needRetraction,
- finalGlobalAgg.partialFinalType,
- partialGlobalAgg.globalAggInfoList.indexOfCountStar)
+ needRetraction = false,
+ finalGlobalAgg.partialFinalType)
}
call.transformTo(globalAgg)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 289d897..3125238 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -161,7 +161,6 @@ object AggregateUtil extends Enumeration {
Array.fill(aggregateCalls.size)(false),
orderKeyIndexes,
needInputCount = false,
- Option.empty[Int],
isStateBackedDataViews = false,
needDistinctInfo = false,
isBounded).aggInfos
@@ -273,7 +272,6 @@ object AggregateUtil extends Enumeration {
aggCallNeedRetractions,
orderKeyIndexes = null,
needInputCount,
- Option.empty[Int],
isStateBackendDataViews,
needDistinctInfo = true,
isBounded = false)
@@ -291,7 +289,6 @@ object AggregateUtil extends Enumeration {
Array.fill(aggregateCalls.size)(false),
orderKeyIndexes,
needInputCount = false,
- Option.empty[Int],
isStateBackedDataViews = false,
needDistinctInfo = false,
isBounded = true).aggInfos
@@ -321,7 +318,6 @@ object AggregateUtil extends Enumeration {
finalAggCallNeedRetractions,
orderKeyIndexes,
needInputCount = false,
- Option.empty[Int],
isStateBackedDataViews = false,
needDistinctInfo = false,
isBounded = true)
@@ -334,31 +330,12 @@ object AggregateUtil extends Enumeration {
needInputCount: Boolean,
isStateBackendDataViews: Boolean,
needDistinctInfo: Boolean = true): AggregateInfoList = {
- transformToStreamAggregateInfoList(
- inputRowType,
- aggregateCalls,
- aggCallNeedRetractions,
- needInputCount,
- Option.empty[Int],
- isStateBackendDataViews,
- needDistinctInfo)
- }
-
- def transformToStreamAggregateInfoList(
- inputRowType: RowType,
- aggregateCalls: Seq[AggregateCall],
- aggCallNeedRetractions: Array[Boolean],
- needInputCount: Boolean,
- indexOfExistingCountStar: Option[Int],
- isStateBackendDataViews: Boolean,
- needDistinctInfo: Boolean): AggregateInfoList = {
transformToAggregateInfoList(
inputRowType,
aggregateCalls,
aggCallNeedRetractions ++ Array(needInputCount), // for additional
count(*)
orderKeyIndexes = null,
needInputCount,
- indexOfExistingCountStar,
isStateBackendDataViews,
needDistinctInfo,
isBounded = false)
@@ -374,7 +351,6 @@ object AggregateUtil extends Enumeration {
* @param needInputCount whether need to calculate the input counts,
which is used in
* aggregation with retraction input.If needed,
* insert a count(1) aggregate into the agg list.
- * @param indexOfExistingCountStar the index for the existing count star
* @param isStateBackedDataViews whether the dataview in accumulator use
state or heap
* @param needDistinctInfo whether need to extract distinct information
*/
@@ -384,7 +360,6 @@ object AggregateUtil extends Enumeration {
aggCallNeedRetractions: Array[Boolean],
orderKeyIndexes: Array[Int],
needInputCount: Boolean,
- indexOfExistingCountStar: Option[Int],
isStateBackedDataViews: Boolean,
needDistinctInfo: Boolean,
isBounded: Boolean): AggregateInfoList = {
@@ -394,7 +369,6 @@ object AggregateUtil extends Enumeration {
// if not exist, insert a new count1 and remember the index
val (indexOfCountStar, countStarInserted, aggCalls) =
insertCountStarAggCall(
needInputCount,
- indexOfExistingCountStar,
aggregateCalls)
// Step-2:
@@ -662,19 +636,12 @@ object AggregateUtil extends Enumeration {
*
* @param needInputCount whether to insert an InputCount aggregate
* @param aggregateCalls original aggregate calls
- * @param indexOfExistingCountStar the index for the existing count star
* @return (indexOfCountStar, countStarInserted, newAggCalls)
*/
private def insertCountStarAggCall(
needInputCount: Boolean,
- indexOfExistingCountStar: Option[Int],
aggregateCalls: Seq[AggregateCall]): (Option[Int], Boolean,
Seq[AggregateCall]) = {
- if (indexOfExistingCountStar.getOrElse(-1) >= 0) {
- require(needInputCount)
- return (indexOfExistingCountStar, false, aggregateCalls)
- }
-
var indexOfCountStar: Option[Int] = None
var countStarInserted: Boolean = false
if (!needInputCount) {
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
index 5eecb92..85c3e43 100644
---
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
@@ -93,25 +93,4 @@ public class IncrementalAggregateJsonPlanTest extends
TableTestBase {
+ "count(distinct c) as c "
+ "from MyTable group by a");
}
-
- @Test
- public void testIncrementalAggregateWithSumCountDistinctAndRetraction() {
- String sinkTableDdl =
- "CREATE TABLE MySink (\n"
- + " b bigint,\n"
- + " sum_b int,\n"
- + " cnt_distinct_b bigint,\n"
- + " cnt1 bigint\n"
- + ") with (\n"
- + " 'connector' = 'values',\n"
- + " 'sink-insert-only' = 'false',\n"
- + " 'table-sink-class' = 'DEFAULT')";
- tEnv.executeSql(sinkTableDdl);
- util.verifyJsonPlan(
- "insert into MySink "
- + "select b, sum(b1), count(distinct b1), count(1) "
- + " from "
- + " (select a, count(b) as b, max(b) as b1 from
MyTable group by a)"
- + " group by b");
- }
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
deleted file mode 100644
index 379fc12..0000000
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ /dev/null
@@ -1,745 +0,0 @@
-{
- "flinkVersion" : "",
- "nodes" : [ {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
- "scanTableSource" : {
- "identifier" : {
- "catalogName" : "default_catalog",
- "databaseName" : "default_database",
- "tableName" : "MyTable"
- },
- "catalogTable" : {
- "schema.3.data-type" : "BIGINT",
- "schema.2.data-type" : "VARCHAR(2147483647)",
- "schema.3.name" : "d",
- "connector" : "values",
- "schema.0.data-type" : "BIGINT",
- "schema.2.name" : "c",
- "schema.1.name" : "b",
- "bounded" : "false",
- "schema.0.name" : "a",
- "schema.1.data-type" : "INT NOT NULL"
- },
- "sourceAbilitySpecs" : [ {
- "type" : "ProjectPushDown",
- "projectedFields" : [ [ 0 ], [ 1 ] ],
- "producedType" : {
- "type" : "ROW",
- "nullable" : false,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "b" : "INT NOT NULL"
- } ]
- }
- } ]
- },
- "id" : 1,
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "b" : "INT NOT NULL"
- } ]
- },
- "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[a, b]]], fields=[a, b])",
- "inputProperties" : [ ]
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner",
- "miniBatchInterval" : {
- "interval" : 10000,
- "mode" : "ProcTime"
- },
- "id" : 2,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "b" : "INT NOT NULL"
- } ]
- },
- "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate",
- "grouping" : [ 0 ],
- "aggCalls" : [ {
- "name" : "b",
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "name" : "b1",
- "aggFunction" : {
- "name" : "MAX",
- "kind" : "MAX",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- } ],
- "aggCallNeedRetractions" : [ false, false ],
- "needRetraction" : false,
- "id" : 3,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "count1$0" : "BIGINT"
- }, {
- "max$1" : "INT"
- } ]
- },
- "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS
count1$0, MAX(b) AS max$1])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
- "id" : 4,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "HASH",
- "keys" : [ 0 ]
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "count1$0" : "BIGINT"
- }, {
- "max$1" : "INT"
- } ]
- },
- "description" : "Exchange(distribution=[hash[a]])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate",
- "grouping" : [ 0 ],
- "aggCalls" : [ {
- "name" : "b",
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "name" : "b1",
- "aggFunction" : {
- "name" : "MAX",
- "kind" : "MAX",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- } ],
- "aggCallNeedRetractions" : [ false, false ],
- "localAggInputRowType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "b" : "INT NOT NULL"
- } ]
- },
- "generateUpdateBefore" : true,
- "needRetraction" : false,
- "id" : 5,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "a" : "BIGINT"
- }, {
- "b" : "BIGINT NOT NULL"
- }, {
- "b1" : "INT NOT NULL"
- } ]
- },
- "description" : "GlobalGroupAggregate(groupBy=[a], select=[a,
COUNT(count1$0) AS b, MAX(max$1) AS b1])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
- "projection" : [ {
- "kind" : "INPUT_REF",
- "inputIndex" : 1,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "kind" : "INPUT_REF",
- "inputIndex" : 2,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- }, {
- "kind" : "REX_CALL",
- "operator" : {
- "name" : "MOD",
- "kind" : "MOD",
- "syntax" : "FUNCTION"
- },
- "operands" : [ {
- "kind" : "REX_CALL",
- "operator" : {
- "name" : "HASH_CODE",
- "kind" : "OTHER_FUNCTION",
- "syntax" : "FUNCTION"
- },
- "operands" : [ {
- "kind" : "INPUT_REF",
- "inputIndex" : 2,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- } ],
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- }, {
- "kind" : "LITERAL",
- "value" : "1024",
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- } ],
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- } ],
- "condition" : null,
- "id" : 6,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "b1" : "INT NOT NULL"
- }, {
- "$f2" : "INT NOT NULL"
- } ]
- },
- "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate",
- "grouping" : [ 0, 2 ],
- "aggCalls" : [ {
- "name" : null,
- "aggFunction" : {
- "name" : "SUM",
- "kind" : "SUM",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : true,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- } ],
- "aggCallNeedRetractions" : [ true, true, true ],
- "needRetraction" : true,
- "id" : 7,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "$f2" : "INT NOT NULL"
- }, {
- "sum$0" : "INT"
- }, {
- "count$1" : "BIGINT"
- }, {
- "count$2" : "BIGINT"
- }, {
- "count1$3" : "BIGINT"
- }, {
- "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView',
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
[...]
- } ]
- },
- "description" : "LocalGroupAggregate(groupBy=[b, $f2],
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0,
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS
count1$3, DISTINCT(b1) AS distinct$0])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
- "id" : 8,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "HASH",
- "keys" : [ 0, 1 ]
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "$f2" : "INT NOT NULL"
- }, {
- "sum$0" : "INT"
- }, {
- "count$1" : "BIGINT"
- }, {
- "count$2" : "BIGINT"
- }, {
- "count1$3" : "BIGINT"
- }, {
- "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView',
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
[...]
- } ]
- },
- "description" : "Exchange(distribution=[hash[b, $f2]])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate",
- "partialAggGrouping" : [ 0, 1 ],
- "finalAggGrouping" : [ 0 ],
- "partialOriginalAggCalls" : [ {
- "name" : null,
- "aggFunction" : {
- "name" : "SUM",
- "kind" : "SUM",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ 1 ],
- "filterArg" : -1,
- "distinct" : true,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "COUNT",
- "kind" : "COUNT",
- "syntax" : "FUNCTION_STAR"
- },
- "argList" : [ ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- } ],
- "partialAggCallNeedRetractions" : [ true, true, true ],
- "partialLocalAggInputRowType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "b1" : "INT NOT NULL"
- }, {
- "$f2" : "INT NOT NULL"
- } ]
- },
- "partialAggNeedRetraction" : true,
- "id" : 9,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "sum$0" : "INT"
- }, {
- "count$1" : "BIGINT"
- }, {
- "count$2" : "BIGINT"
- }, {
- "count1$3" : "BIGINT"
- } ]
- },
- "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2],
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0,
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3)
AS count1$3])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
- "id" : 10,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "HASH",
- "keys" : [ 0 ]
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "sum$0" : "INT"
- }, {
- "count$1" : "BIGINT"
- }, {
- "count$2" : "BIGINT"
- }, {
- "count1$3" : "BIGINT"
- } ]
- },
- "description" : "Exchange(distribution=[hash[b]])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate",
- "grouping" : [ 0 ],
- "aggCalls" : [ {
- "name" : null,
- "aggFunction" : {
- "name" : "SUM",
- "kind" : "SUM",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 2 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "INTEGER",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "$SUM0",
- "kind" : "SUM0",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 3 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- }, {
- "name" : null,
- "aggFunction" : {
- "name" : "$SUM0",
- "kind" : "SUM0",
- "syntax" : "FUNCTION"
- },
- "argList" : [ 4 ],
- "filterArg" : -1,
- "distinct" : false,
- "approximate" : false,
- "ignoreNulls" : false,
- "type" : {
- "typeName" : "BIGINT",
- "nullable" : false
- }
- } ],
- "aggCallNeedRetractions" : [ true, true, true ],
- "localAggInputRowType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "$f2" : "INT NOT NULL"
- }, {
- "$f2_0" : "INT NOT NULL"
- }, {
- "$f3" : "BIGINT NOT NULL"
- }, {
- "$f4" : "BIGINT NOT NULL"
- } ]
- },
- "generateUpdateBefore" : true,
- "needRetraction" : true,
- "indexOfCountStar" : 2,
- "id" : 11,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "$f1" : "INT NOT NULL"
- }, {
- "$f2" : "BIGINT NOT NULL"
- }, {
- "$f3" : "BIGINT NOT NULL"
- } ]
- },
- "description" : "GlobalGroupAggregate(groupBy=[b],
partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1,
$SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3],
indexOfCountStar=[2])"
- }, {
- "class" :
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
- "dynamicTableSink" : {
- "identifier" : {
- "catalogName" : "default_catalog",
- "databaseName" : "default_database",
- "tableName" : "MySink"
- },
- "catalogTable" : {
- "schema.3.data-type" : "BIGINT",
- "sink-insert-only" : "false",
- "table-sink-class" : "DEFAULT",
- "schema.2.data-type" : "BIGINT",
- "schema.3.name" : "cnt1",
- "connector" : "values",
- "schema.0.data-type" : "BIGINT",
- "schema.2.name" : "cnt_distinct_b",
- "schema.1.name" : "sum_b",
- "schema.0.name" : "b",
- "schema.1.data-type" : "INT"
- }
- },
- "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
- "id" : 12,
- "inputProperties" : [ {
- "requiredDistribution" : {
- "type" : "UNKNOWN"
- },
- "damBehavior" : "PIPELINED",
- "priority" : 0
- } ],
- "outputType" : {
- "type" : "ROW",
- "nullable" : true,
- "fields" : [ {
- "b" : "BIGINT NOT NULL"
- }, {
- "$f1" : "INT NOT NULL"
- }, {
- "$f2" : "BIGINT NOT NULL"
- }, {
- "$f3" : "BIGINT NOT NULL"
- } ]
- },
- "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[b, $f1, $f2, $f3])"
- } ],
- "edges" : [ {
- "source" : 1,
- "target" : 2,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 2,
- "target" : 3,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 3,
- "target" : 4,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 4,
- "target" : 5,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 5,
- "target" : 6,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 6,
- "target" : 7,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 7,
- "target" : 8,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 8,
- "target" : 9,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 9,
- "target" : 10,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 10,
- "target" : 11,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- }, {
- "source" : 11,
- "target" : 12,
- "shuffle" : {
- "type" : "FORWARD"
- },
- "shuffleMode" : "PIPELINED"
- } ]
-}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index cc77008..6eadc7d 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -1613,43 +1613,6 @@ GlobalGroupAggregate(groupBy=[a], select=[a,
COUNT(distinct$0 count$0) AS EXPR$1
]]>
</Resource>
</TestCase>
- <TestCase
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true,
aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
- SELECT
- a, COUNT(b) as b, MAX(b) as b1
- FROM MyTable
- GROUP BY a
-) GROUP BY b
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)],
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
- +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
- +- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b,
SUM_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS
$f3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
- +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b,
$f2, SUM_RETRACT(b1) AS $f2_0, COUNT_RETRACT(DISTINCT b1) AS $f3,
COUNT_RETRACT(*) AS $f4], changelogMode=[I,UB,UA,D])
- +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA])
- +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2],
changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b, MAX(b) AS
b1], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime],
changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
<TestCase
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true,
aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
<![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1673,103 +1636,6 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL],
select=[a, $SUM0_RETRACT($
]]>
</Resource>
</TestCase>
- <TestCase
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0_RETRACT(sum$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
-+- Exchange(distribution=[hash[a]])
- +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0_RETRACT($f1) AS sum$0, $SUM0_RETRACT($f2) AS sum$1, COUNT_RETRACT(*) AS
count1$2])
- +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL],
select=[a, COUNT(distinct$0 count$0) AS $f1, COUNT(count$1) AS $f2])
- +- Exchange(distribution=[hash[a]])
- +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL],
select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS
distinct$0])
- +- Calc(select=[a, b])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
-]]>
- </Resource>
- </TestCase>
- <TestCase
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=false,
aggPhaseEnforcer=ONE_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
- SELECT
- a, COUNT(b) as b, MAX(b) as b1
- FROM MyTable
- GROUP BY a
-) GROUP BY b
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)],
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
- +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
- +- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-GroupAggregate(groupBy=[b], select=[b, SUM_RETRACT(b1) AS EXPR$1,
COUNT_RETRACT(DISTINCT b1) AS EXPR$2, COUNT_RETRACT(*) AS EXPR$3],
changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA])
- +- Calc(select=[b, b1], changelogMode=[I,UB,UA])
- +- GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b, MAX(b) AS b1],
changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime],
changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
- <TestCase
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=false,
aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
- SELECT
- a, COUNT(b) as b, MAX(b) as b1
- FROM MyTable
- GROUP BY a
-) GROUP BY b
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)],
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
- +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
- +- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS
EXPR$1, COUNT_RETRACT(distinct$0 count$2) AS EXPR$2, COUNT_RETRACT(count1$3) AS
EXPR$3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[b], select=[b, SUM_RETRACT(b1) AS (sum$0,
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS
count1$3, DISTINCT(b1) AS distinct$0], changelogMode=[I])
- +- Calc(select=[b, b1], changelogMode=[I,UB,UA])
- +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b,
MAX(max$1) AS b1], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS
count$0, MAX(b) AS max$1], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime],
changelogMode=[I])
- +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
<TestCase
name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true,
aggPhaseEnforcer=ONE_PHASE]">
<Resource name="sql">
<![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM
MyTable GROUP BY c]]>
@@ -1795,43 +1661,28 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL],
select=[c, SUM_RETRACT($f3
]]>
</Resource>
</TestCase>
- <TestCase
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
+ <TestCase
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
- <![CDATA[
-SELECT
- b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
- SELECT
- a, COUNT(b) as b, MAX(b) as b1
- FROM MyTable
- GROUP BY a
-) GROUP BY b
- ]]>
+ <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)],
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
- +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
- +- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
]]>
</Resource>
- <Resource name="optimized rel plan">
+ <Resource name="optimized exec plan">
<![CDATA[
-GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b,
SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(sum$2) AS $f2,
$SUM0_RETRACT(sum$3) AS $f3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b,
SUM_RETRACT($f2_0) AS (sum$0, count$1), $SUM0_RETRACT($f3) AS sum$2,
$SUM0_RETRACT($f4) AS sum$3, COUNT_RETRACT(*) AS count1$4], changelogMode=[I])
- +- GlobalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL],
select=[b, $f2, SUM_RETRACT((sum$0, count$1)) AS $f2_0,
COUNT_RETRACT(distinct$0 count$2) AS $f3, COUNT_RETRACT(count1$3) AS $f4],
changelogMode=[I,UB,UA,D])
- +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[b, $f2],
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0,
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS
count1$3, DISTINCT(b1) AS distinct$0], changelogMode=[I])
- +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2],
changelogMode=[I,UB,UA])
- +- GlobalGroupAggregate(groupBy=[a], select=[a,
COUNT(count$0) AS b, MAX(max$1) AS b1], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[a], select=[a,
COUNT(b) AS count$0, MAX(b) AS max$1], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], changelogMode=[I])
- +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0_RETRACT(sum$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
++- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0_RETRACT($f1) AS sum$0, $SUM0_RETRACT($f2) AS sum$1, COUNT_RETRACT(*) AS
count1$2])
+ +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL],
select=[a, COUNT(distinct$0 count$0) AS $f1, COUNT(count$1) AS $f2])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL],
select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS
distinct$0])
+ +- Calc(select=[a, b])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 3cdb045..cf188ee 100644
---
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -318,7 +318,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)],
EXPR$2=[COUNT()])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0_RETRACT(count$0) AS $f1, $SUM0_RETRACT(count1$1) AS $f2],
indexOfCountStar=[1], changelogMode=[I,UA,D])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a,
$SUM0(count$0) AS $f1, $SUM0(count1$1) AS $f2], changelogMode=[I,UA,D])
+- Exchange(distribution=[hash[a]], changelogMode=[I])
+- IncrementalGroupAggregate(partialAggGrouping=[a, $f2],
finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0,
COUNT_RETRACT(count1$1) AS count1$1], changelogMode=[I])
+- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
@@ -435,45 +435,6 @@ GlobalGroupAggregate(groupBy=[a],
partialFinalType=[FINAL], select=[a, $SUM0(cou
]]>
</Resource>
</TestCase>
- <TestCase
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
- <Resource name="sql">
- <![CDATA[
-SELECT
- b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
- SELECT
- a, COUNT(b) as b, MAX(b) as b1
- FROM MyTable
- GROUP BY a
-) GROUP BY b
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)],
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
- +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
- +- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b,
SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2,
$SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
- +- IncrementalGroupAggregate(partialAggGrouping=[b, $f2],
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0,
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3)
AS count1$3], changelogMode=[I])
- +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL],
select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0
b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0],
changelogMode=[I])
- +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2],
changelogMode=[I,UB,UA])
- +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0)
AS b, MAX(max$1) AS b1], changelogMode=[I,UB,UA])
- +- Exchange(distribution=[hash[a]], changelogMode=[I])
- +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b)
AS count$0, MAX(b) AS max$1], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- MiniBatchAssigner(interval=[1000ms],
mode=[ProcTime], changelogMode=[I])
- +-
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
<TestCase
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true,
aggPhaseEnforcer=TWO_PHASE]">
<Resource name="sql">
<![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index bc661d1..2f48206 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -170,22 +170,6 @@ class DistinctAggregateTest(
}
@Test
- def testSumCountWithSingleDistinctAndRetraction(): Unit = {
- val sqlQuery =
- s"""
- |SELECT
- | b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
- |FROM(
- | SELECT
- | a, COUNT(b) as b, MAX(b) as b1
- | FROM MyTable
- | GROUP BY a
- |) GROUP BY b
- """.stripMargin
- util.verifyRelPlan(sqlQuery, ExplainDetail.CHANGELOG_MODE)
- }
-
- @Test
def testMinMaxWithRetraction(): Unit = {
val sqlQuery =
s"""
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index e0042ba..804c832 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -297,58 +297,6 @@ class SplitAggregateITCase(
}
@Test
- def testCountWithSingleDistinctAndRetraction(): Unit = {
- // Test for FLINK-23434. The result is incorrect, because the global agg
on incremental agg
- // does not handle retraction message. While if binary mode is on, the
result is correct
- // event without this fix. Because mini-batch with binary mode will
compact retraction message
- // in this case.
- val t1 = tEnv.sqlQuery(
- s"""
- |SELECT
- | b, COUNT(DISTINCT b1), COUNT(1)
- |FROM(
- | SELECT
- | a, COUNT(b) as b, MAX(b) as b1
- | FROM T
- | GROUP BY a
- |) GROUP BY b
- """.stripMargin)
-
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List("2,2,2", "4,1,1", "8,1,1")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
- }
-
- @Test
- def testSumCountWithSingleDistinctAndRetraction(): Unit = {
- // Test for FLINK-23434. The plan and the result is incorrect, because sum
with retraction
- // will produce two acc values, while sum without retraction will produce
only one acc value,
- // the type consistent validation will fail in the
IncrementalAggregateRule because wrong
- // retraction flag is given.
- val t1 = tEnv.sqlQuery(
- s"""
- |SELECT
- | b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
- |FROM(
- | SELECT
- | a, COUNT(b) as b, MAX(b) as b1
- | FROM T
- | GROUP BY a
- |) GROUP BY b
- """.stripMargin)
-
- val sink = new TestingRetractSink
- t1.toRetractStream[Row].addSink(sink)
- env.execute()
-
- val expected = List("2,7,2,2", "4,6,1,1", "8,5,1,1")
- assertEquals(expected.sorted, sink.getRetractResults.sorted)
- }
-
- @Test
def testAggWithJoin(): Unit = {
val t1 = tEnv.sqlQuery(
s"""