This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ea2bddc  Revert "[FLINK-23434][table-planner-blink] Fix the 
inconsistent type in IncrementalAggregateRule when the query has one distinct 
agg function and count star agg function"
ea2bddc is described below

commit ea2bddc8e82ab4b9797f40534b1e5637eef4c5b5
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 23 17:04:51 2021 +0200

    Revert "[FLINK-23434][table-planner-blink] Fix the inconsistent type in 
IncrementalAggregateRule when the query has one distinct agg function and count 
star agg function"
    
    This reverts commit c8ad0ec1e5ad21da6b7663944d99f7e413aae0c9.
---
 .../stream/StreamExecGlobalGroupAggregate.java     |  17 -
 .../StreamPhysicalGlobalGroupAggregate.scala       |  19 +-
 .../physical/stream/IncrementalAggregateRule.scala |  21 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |  33 -
 .../stream/IncrementalAggregateJsonPlanTest.java   |  21 -
 ...lAggregateWithSumCountDistinctAndRetraction.out | 745 ---------------------
 .../plan/stream/sql/agg/DistinctAggregateTest.xml  | 179 +----
 .../stream/sql/agg/IncrementalAggregateTest.xml    |  41 +-
 .../stream/sql/agg/DistinctAggregateTest.scala     |  16 -
 .../runtime/stream/sql/SplitAggregateITCase.scala  |  52 --
 10 files changed, 30 insertions(+), 1114 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 67ccfab..e6062c0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -50,7 +50,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
@@ -60,12 +59,9 @@ import org.apache.calcite.tools.RelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,7 +71,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecGlobalGroupAggregate.class);
 
     public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE = 
"localAggInputRowType";
-    public static final String FIELD_NAME_INDEX_OF_COUNT_STAR = 
"indexOfCountStar";
 
     @JsonProperty(FIELD_NAME_GROUPING)
     private final int[] grouping;
@@ -101,11 +96,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
     @JsonProperty(FIELD_NAME_NEED_RETRACTION)
     private final boolean needRetraction;
 
-    /** The position for the existing count star. */
-    @JsonProperty(FIELD_NAME_INDEX_OF_COUNT_STAR)
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    protected final Integer indexOfCountStar;
-
     public StreamExecGlobalGroupAggregate(
             int[] grouping,
             AggregateCall[] aggCalls,
@@ -113,7 +103,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
             RowType localAggInputRowType,
             boolean generateUpdateBefore,
             boolean needRetraction,
-            @Nullable Integer indexOfCountStar,
             InputProperty inputProperty,
             RowType outputType,
             String description) {
@@ -124,7 +113,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
                 localAggInputRowType,
                 generateUpdateBefore,
                 needRetraction,
-                indexOfCountStar,
                 getNewNodeId(),
                 Collections.singletonList(inputProperty),
                 outputType,
@@ -139,7 +127,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
             @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType 
localAggInputRowType,
             @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
             @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
-            @JsonProperty(FIELD_NAME_INDEX_OF_COUNT_STAR) @Nullable Integer 
indexOfCountStar,
             @JsonProperty(FIELD_NAME_ID) int id,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@@ -152,8 +139,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
         this.localAggInputRowType = checkNotNull(localAggInputRowType);
         this.generateUpdateBefore = generateUpdateBefore;
         this.needRetraction = needRetraction;
-        checkArgument(indexOfCountStar == null || indexOfCountStar >= 0 && 
needRetraction);
-        this.indexOfCountStar = indexOfCountStar;
     }
 
     @SuppressWarnings("unchecked")
@@ -179,7 +164,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
                         
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
                         aggCallNeedRetractions,
                         needRetraction,
-                        
JavaScalaConversionUtil.toScala(Optional.ofNullable(indexOfCountStar)),
                         false, // isStateBackendDataViews
                         true); // needDistinctInfo
         final AggregateInfoList globalAggInfoList =
@@ -188,7 +172,6 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
                         
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
                         aggCallNeedRetractions,
                         needRetraction,
-                        
JavaScalaConversionUtil.toScala(Optional.ofNullable(indexOfCountStar)),
                         true, // isStateBackendDataViews
                         true); // needDistinctInfo
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
index 2e4dbb0..b0ef077 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalGlobalGroupAggregate.scala
@@ -43,30 +43,22 @@ class StreamPhysicalGlobalGroupAggregate(
     val aggCallNeedRetractions: Array[Boolean],
     val localAggInputRowType: RelDataType,
     val needRetraction: Boolean,
-    val partialFinalType: PartialFinalType,
-    indexOfCountStar: Option[Int] = Option.empty)
+    val partialFinalType: PartialFinalType)
   extends StreamPhysicalGroupAggregateBase(cluster, traitSet, inputRel) {
 
-  // if the indexOfCountStar is valid, the needRetraction should be true
-  require(indexOfCountStar.isEmpty || indexOfCountStar.get >= 0 && 
needRetraction)
-
   lazy val localAggInfoList: AggregateInfoList = 
AggregateUtil.transformToStreamAggregateInfoList(
     FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
     aggCalls,
     aggCallNeedRetractions,
     needRetraction,
-    indexOfCountStar,
-    isStateBackendDataViews = false,
-    needDistinctInfo = true)
+    isStateBackendDataViews = false)
 
   lazy val globalAggInfoList: AggregateInfoList = 
AggregateUtil.transformToStreamAggregateInfoList(
     FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
     aggCalls,
     aggCallNeedRetractions,
     needRetraction,
-    indexOfCountStar,
-    isStateBackendDataViews = true,
-    needDistinctInfo = true)
+    isStateBackendDataViews = true)
 
   override def requireWatermark: Boolean = false
 
@@ -83,8 +75,7 @@ class StreamPhysicalGlobalGroupAggregate(
       aggCallNeedRetractions,
       localAggInputRowType,
       needRetraction,
-      partialFinalType,
-      indexOfCountStar)
+      partialFinalType)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -98,7 +89,6 @@ class StreamPhysicalGlobalGroupAggregate(
         globalAggInfoList,
         grouping,
         isGlobal = true))
-      .itemIf("indexOfCountStar", indexOfCountStar.getOrElse(-1), 
indexOfCountStar.nonEmpty)
   }
 
   override def translateToExecNode(): ExecNode[_] = {
@@ -110,7 +100,6 @@ class StreamPhysicalGlobalGroupAggregate(
       FlinkTypeFactory.toLogicalRowType(localAggInputRowType),
       generateUpdateBefore,
       needRetraction,
-      indexOfCountStar.map(Integer.valueOf).orNull,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
index 4c579a2..235c441 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/IncrementalAggregateRule.scala
@@ -102,18 +102,17 @@ class IncrementalAggregateRule
         false))
       finalGlobalAgg.copy(finalGlobalAgg.getTraitSet, 
Collections.singletonList(newExchange))
     } else {
-      // adapt the needRetract of final global agg to be same as that of 
partial agg
+      // an additional count1 is inserted, need to adapt the global agg
       val localAggInfoList = AggregateUtil.transformToStreamAggregateInfoList(
         // the final agg input is partial agg
         FlinkTypeFactory.toLogicalRowType(partialGlobalAgg.getRowType),
         finalRealAggCalls,
-        // use partial global agg's aggCallNeedRetractions
-        partialGlobalAgg.aggCallNeedRetractions,
-        partialGlobalAgg.needRetraction,
-        partialGlobalAgg.globalAggInfoList.indexOfCountStar,
+        // all the aggs do not need retraction
+        Array.fill(finalRealAggCalls.length)(false),
+        // also do not need count*
+        needInputCount = false,
         // the local agg is not works on state
-        isStateBackendDataViews = false,
-        needDistinctInfo = true)
+        isStateBackendDataViews = false)
 
       // check whether the global agg required input row type equals the incr 
agg output row type
       val globalAggInputAccType = AggregateUtil.inferLocalAggRowType(
@@ -134,11 +133,11 @@ class IncrementalAggregateRule
         finalGlobalAgg.getRowType,
         finalGlobalAgg.grouping,
         finalRealAggCalls,
-        partialGlobalAgg.aggCallNeedRetractions,
+        // all the aggs do not need retraction
+        Array.fill(finalRealAggCalls.length)(false),
         finalGlobalAgg.localAggInputRowType,
-        partialGlobalAgg.needRetraction,
-        finalGlobalAgg.partialFinalType,
-        partialGlobalAgg.globalAggInfoList.indexOfCountStar)
+        needRetraction = false,
+        finalGlobalAgg.partialFinalType)
     }
 
     call.transformTo(globalAgg)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 289d897..3125238 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -161,7 +161,6 @@ object AggregateUtil extends Enumeration {
       Array.fill(aggregateCalls.size)(false),
       orderKeyIndexes,
       needInputCount = false,
-      Option.empty[Int],
       isStateBackedDataViews = false,
       needDistinctInfo = false,
       isBounded).aggInfos
@@ -273,7 +272,6 @@ object AggregateUtil extends Enumeration {
       aggCallNeedRetractions,
       orderKeyIndexes = null,
       needInputCount,
-      Option.empty[Int],
       isStateBackendDataViews,
       needDistinctInfo = true,
       isBounded = false)
@@ -291,7 +289,6 @@ object AggregateUtil extends Enumeration {
       Array.fill(aggregateCalls.size)(false),
       orderKeyIndexes,
       needInputCount = false,
-      Option.empty[Int],
       isStateBackedDataViews = false,
       needDistinctInfo = false,
       isBounded = true).aggInfos
@@ -321,7 +318,6 @@ object AggregateUtil extends Enumeration {
       finalAggCallNeedRetractions,
       orderKeyIndexes,
       needInputCount = false,
-      Option.empty[Int],
       isStateBackedDataViews = false,
       needDistinctInfo = false,
       isBounded = true)
@@ -334,31 +330,12 @@ object AggregateUtil extends Enumeration {
       needInputCount: Boolean,
       isStateBackendDataViews: Boolean,
       needDistinctInfo: Boolean = true): AggregateInfoList = {
-    transformToStreamAggregateInfoList(
-      inputRowType,
-      aggregateCalls,
-      aggCallNeedRetractions,
-      needInputCount,
-      Option.empty[Int],
-      isStateBackendDataViews,
-      needDistinctInfo)
-  }
-
-  def transformToStreamAggregateInfoList(
-      inputRowType: RowType,
-      aggregateCalls: Seq[AggregateCall],
-      aggCallNeedRetractions: Array[Boolean],
-      needInputCount: Boolean,
-      indexOfExistingCountStar: Option[Int],
-      isStateBackendDataViews: Boolean,
-      needDistinctInfo: Boolean): AggregateInfoList = {
     transformToAggregateInfoList(
       inputRowType,
       aggregateCalls,
       aggCallNeedRetractions ++ Array(needInputCount), // for additional 
count(*)
       orderKeyIndexes = null,
       needInputCount,
-      indexOfExistingCountStar,
       isStateBackendDataViews,
       needDistinctInfo,
       isBounded = false)
@@ -374,7 +351,6 @@ object AggregateUtil extends Enumeration {
     * @param needInputCount   whether need to calculate the input counts, 
which is used in
     *                         aggregation with retraction input.If needed,
     *                         insert a count(1) aggregate into the agg list.
-    * @param indexOfExistingCountStar the index for the existing count star
     * @param isStateBackedDataViews   whether the dataview in accumulator use 
state or heap
     * @param needDistinctInfo  whether need to extract distinct information
     */
@@ -384,7 +360,6 @@ object AggregateUtil extends Enumeration {
       aggCallNeedRetractions: Array[Boolean],
       orderKeyIndexes: Array[Int],
       needInputCount: Boolean,
-      indexOfExistingCountStar: Option[Int],
       isStateBackedDataViews: Boolean,
       needDistinctInfo: Boolean,
       isBounded: Boolean): AggregateInfoList = {
@@ -394,7 +369,6 @@ object AggregateUtil extends Enumeration {
     // if not exist, insert a new count1 and remember the index
     val (indexOfCountStar, countStarInserted, aggCalls) = 
insertCountStarAggCall(
       needInputCount,
-      indexOfExistingCountStar,
       aggregateCalls)
 
     // Step-2:
@@ -662,19 +636,12 @@ object AggregateUtil extends Enumeration {
     *
     * @param needInputCount whether to insert an InputCount aggregate
     * @param aggregateCalls original aggregate calls
-    * @param indexOfExistingCountStar the index for the existing count star
     * @return (indexOfCountStar, countStarInserted, newAggCalls)
     */
   private def insertCountStarAggCall(
       needInputCount: Boolean,
-      indexOfExistingCountStar: Option[Int],
       aggregateCalls: Seq[AggregateCall]): (Option[Int], Boolean, 
Seq[AggregateCall]) = {
 
-    if (indexOfExistingCountStar.getOrElse(-1) >= 0) {
-      require(needInputCount)
-      return (indexOfExistingCountStar, false, aggregateCalls)
-    }
-
     var indexOfCountStar: Option[Int] = None
     var countStarInserted: Boolean = false
     if (!needInputCount) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
index 5eecb92..85c3e43 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest.java
@@ -93,25 +93,4 @@ public class IncrementalAggregateJsonPlanTest extends 
TableTestBase {
                         + "count(distinct c) as c "
                         + "from MyTable group by a");
     }
-
-    @Test
-    public void testIncrementalAggregateWithSumCountDistinctAndRetraction() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  b bigint,\n"
-                        + "  sum_b int,\n"
-                        + "  cnt_distinct_b bigint,\n"
-                        + "  cnt1 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink "
-                        + "select b, sum(b1), count(distinct b1), count(1) "
-                        + " from "
-                        + "   (select a, count(b) as b, max(b) as b1 from 
MyTable group by a)"
-                        + " group by b");
-    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
deleted file mode 100644
index 379fc12..0000000
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ /dev/null
@@ -1,745 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-    "scanTableSource" : {
-      "identifier" : {
-        "catalogName" : "default_catalog",
-        "databaseName" : "default_database",
-        "tableName" : "MyTable"
-      },
-      "catalogTable" : {
-        "schema.3.data-type" : "BIGINT",
-        "schema.2.data-type" : "VARCHAR(2147483647)",
-        "schema.3.name" : "d",
-        "connector" : "values",
-        "schema.0.data-type" : "BIGINT",
-        "schema.2.name" : "c",
-        "schema.1.name" : "b",
-        "bounded" : "false",
-        "schema.0.name" : "a",
-        "schema.1.data-type" : "INT NOT NULL"
-      },
-      "sourceAbilitySpecs" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : {
-          "type" : "ROW",
-          "nullable" : false,
-          "fields" : [ {
-            "a" : "BIGINT"
-          }, {
-            "b" : "INT NOT NULL"
-          } ]
-        }
-      } ]
-    },
-    "id" : 1,
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "b" : "INT NOT NULL"
-      } ]
-    },
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "id" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "b" : "INT NOT NULL"
-      } ]
-    },
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate",
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "needRetraction" : false,
-    "id" : 3,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "count1$0" : "BIGINT"
-      }, {
-        "max$1" : "INT"
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(*) AS 
count1$0, MAX(b) AS max$1])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
-    "id" : 4,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "count1$0" : "BIGINT"
-      }, {
-        "max$1" : "INT"
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[a]])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate",
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "localAggInputRowType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "b" : "INT NOT NULL"
-      } ]
-    },
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "id" : 5,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "a" : "BIGINT"
-      }, {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "b1" : "INT NOT NULL"
-      } ]
-    },
-    "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, 
COUNT(count1$0) AS b, MAX(max$1) AS b1])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    }, {
-      "kind" : "REX_CALL",
-      "operator" : {
-        "name" : "MOD",
-        "kind" : "MOD",
-        "syntax" : "FUNCTION"
-      },
-      "operands" : [ {
-        "kind" : "REX_CALL",
-        "operator" : {
-          "name" : "HASH_CODE",
-          "kind" : "OTHER_FUNCTION",
-          "syntax" : "FUNCTION"
-        },
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 2,
-          "type" : {
-            "typeName" : "INTEGER",
-            "nullable" : false
-          }
-        } ],
-        "type" : {
-          "typeName" : "INTEGER",
-          "nullable" : false
-        }
-      }, {
-        "kind" : "LITERAL",
-        "value" : "1024",
-        "type" : {
-          "typeName" : "INTEGER",
-          "nullable" : false
-        }
-      } ],
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    } ],
-    "condition" : null,
-    "id" : 6,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "b1" : "INT NOT NULL"
-      }, {
-        "$f2" : "INT NOT NULL"
-      } ]
-    },
-    "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate",
-    "grouping" : [ 0, 2 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "needRetraction" : true,
-    "id" : 7,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "$f2" : "INT NOT NULL"
-      }, {
-        "sum$0" : "INT"
-      }, {
-        "count$1" : "BIGINT"
-      }, {
-        "count$2" : "BIGINT"
-      }, {
-        "count1$3" : "BIGINT"
-      }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[b, $f2], 
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS 
count1$3, DISTINCT(b1) AS distinct$0])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
-    "id" : 8,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "$f2" : "INT NOT NULL"
-      }, {
-        "sum$0" : "INT"
-      }, {
-        "count$1" : "BIGINT"
-      }, {
-        "count$2" : "BIGINT"
-      }, {
-        "count1$3" : "BIGINT"
-      }, {
-        "distinct$0" : "RAW('org.apache.flink.table.api.dataview.MapView', 
'AFZvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnJ1bnRpbWUudHlwZXV0aWxzLkV4dGVybmFsU2VyaWFsaXplciRFeHRlcm5hbFNlcmlhbGl6ZXJTbmFwc2hvdAAAAAMADecEAAAAAaztAAVzcgArb3JnLmFwYWNoZS5mbGluay50YWJsZS50eXBlcy5GaWVsZHNEYXRhVHlwZfSwrBytgZ9fAgABTAAOZmllbGREYXRhVHlwZXN0ABBMamF2YS91dGlsL0xpc3Q7eHIAJW9yZy5hcGFjaGUuZmxpbmsudGFibGUudHlwZXMuRGF0YVR5cGV5y2rIj5/EeAIAAkwAD2NvbnZlcnNpb25DbGFzc3QAEUxqYXZhL2xhbmcvQ2xhc3M7TAALbG9naWNhbFR5cGV0ADJMb3JnL2Fw
 [...]
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[b, $f2]])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate",
-    "partialAggGrouping" : [ 0, 1 ],
-    "finalAggGrouping" : [ 0 ],
-    "partialOriginalAggCalls" : [ {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    } ],
-    "partialAggCallNeedRetractions" : [ true, true, true ],
-    "partialLocalAggInputRowType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "b1" : "INT NOT NULL"
-      }, {
-        "$f2" : "INT NOT NULL"
-      } ]
-    },
-    "partialAggNeedRetraction" : true,
-    "id" : 9,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "sum$0" : "INT"
-      }, {
-        "count$1" : "BIGINT"
-      }, {
-        "count$2" : "BIGINT"
-      }, {
-        "count1$3" : "BIGINT"
-      } ]
-    },
-    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], 
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) 
AS count1$3])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange",
-    "id" : 10,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "sum$0" : "INT"
-      }, {
-        "count$1" : "BIGINT"
-      }, {
-        "count$2" : "BIGINT"
-      }, {
-        "count1$3" : "BIGINT"
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate",
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 2 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "INTEGER",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    }, {
-      "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : {
-        "typeName" : "BIGINT",
-        "nullable" : false
-      }
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "localAggInputRowType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "$f2" : "INT NOT NULL"
-      }, {
-        "$f2_0" : "INT NOT NULL"
-      }, {
-        "$f3" : "BIGINT NOT NULL"
-      }, {
-        "$f4" : "BIGINT NOT NULL"
-      } ]
-    },
-    "generateUpdateBefore" : true,
-    "needRetraction" : true,
-    "indexOfCountStar" : 2,
-    "id" : 11,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "$f1" : "INT NOT NULL"
-      }, {
-        "$f2" : "BIGINT NOT NULL"
-      }, {
-        "$f3" : "BIGINT NOT NULL"
-      } ]
-    },
-    "description" : "GlobalGroupAggregate(groupBy=[b], 
partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, 
$SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], 
indexOfCountStar=[2])"
-  }, {
-    "class" : 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink",
-    "dynamicTableSink" : {
-      "identifier" : {
-        "catalogName" : "default_catalog",
-        "databaseName" : "default_database",
-        "tableName" : "MySink"
-      },
-      "catalogTable" : {
-        "schema.3.data-type" : "BIGINT",
-        "sink-insert-only" : "false",
-        "table-sink-class" : "DEFAULT",
-        "schema.2.data-type" : "BIGINT",
-        "schema.3.name" : "cnt1",
-        "connector" : "values",
-        "schema.0.data-type" : "BIGINT",
-        "schema.2.name" : "cnt_distinct_b",
-        "schema.1.name" : "sum_b",
-        "schema.0.name" : "b",
-        "schema.1.data-type" : "INT"
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "id" : 12,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "nullable" : true,
-      "fields" : [ {
-        "b" : "BIGINT NOT NULL"
-      }, {
-        "$f1" : "INT NOT NULL"
-      }, {
-        "$f2" : "BIGINT NOT NULL"
-      }, {
-        "$f3" : "BIGINT NOT NULL"
-      } ]
-    },
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, $f1, $f2, $f3])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 9,
-    "target" : 10,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 10,
-    "target" : 11,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 11,
-    "target" : 12,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
index cc77008..6eadc7d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.xml
@@ -1613,43 +1613,6 @@ GlobalGroupAggregate(groupBy=[a], select=[a, 
COUNT(distinct$0 count$0) AS EXPR$1
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
-   SELECT
-     a, COUNT(b) as b, MAX(b) as b1
-   FROM MyTable
-   GROUP BY a
-) GROUP BY b
-       ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)], 
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
-   +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
-      +- LogicalProject(a=[$0], b=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-GroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, 
SUM_RETRACT($f2_0) AS $f1, $SUM0_RETRACT($f3) AS $f2, $SUM0_RETRACT($f4) AS 
$f3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA,D])
-   +- GroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, 
$f2, SUM_RETRACT(b1) AS $f2_0, COUNT_RETRACT(DISTINCT b1) AS $f3, 
COUNT_RETRACT(*) AS $f4], changelogMode=[I,UB,UA,D])
-      +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I,UB,UA])
-         +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2], 
changelogMode=[I,UB,UA])
-            +- GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b, MAX(b) AS 
b1], changelogMode=[I,UB,UA])
-               +- Exchange(distribution=[hash[a]], changelogMode=[I])
-                  +- Calc(select=[a, b], changelogMode=[I])
-                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], 
changelogMode=[I])
-                        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase 
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
@@ -1673,103 +1636,6 @@ GroupAggregate(groupBy=[a], partialFinalType=[FINAL], 
select=[a, $SUM0_RETRACT($
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
-+- LogicalProject(a=[$0], b=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT(sum$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
-+- Exchange(distribution=[hash[a]])
-   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT($f1) AS sum$0, $SUM0_RETRACT($f2) AS sum$1, COUNT_RETRACT(*) AS 
count1$2])
-      +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], 
select=[a, COUNT(distinct$0 count$0) AS $f1, COUNT(count$1) AS $f2])
-         +- Exchange(distribution=[hash[a]])
-            +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], 
select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS 
distinct$0])
-               +- Calc(select=[a, b])
-                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
-                     +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=false,
 aggPhaseEnforcer=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
-   SELECT
-     a, COUNT(b) as b, MAX(b) as b1
-   FROM MyTable
-   GROUP BY a
-) GROUP BY b
-       ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)], 
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
-   +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
-      +- LogicalProject(a=[$0], b=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-GroupAggregate(groupBy=[b], select=[b, SUM_RETRACT(b1) AS EXPR$1, 
COUNT_RETRACT(DISTINCT b1) AS EXPR$2, COUNT_RETRACT(*) AS EXPR$3], 
changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I,UB,UA])
-   +- Calc(select=[b, b1], changelogMode=[I,UB,UA])
-      +- GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b, MAX(b) AS b1], 
changelogMode=[I,UB,UA])
-         +- Exchange(distribution=[hash[a]], changelogMode=[I])
-            +- Calc(select=[a, b], changelogMode=[I])
-               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], 
changelogMode=[I])
-                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=false,
 aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
-   SELECT
-     a, COUNT(b) as b, MAX(b) as b1
-   FROM MyTable
-   GROUP BY a
-) GROUP BY b
-       ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)], 
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
-   +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
-      +- LogicalProject(a=[$0], b=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS 
EXPR$1, COUNT_RETRACT(distinct$0 count$2) AS EXPR$2, COUNT_RETRACT(count1$3) AS 
EXPR$3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
-   +- LocalGroupAggregate(groupBy=[b], select=[b, SUM_RETRACT(b1) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS 
count1$3, DISTINCT(b1) AS distinct$0], changelogMode=[I])
-      +- Calc(select=[b, b1], changelogMode=[I,UB,UA])
-         +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b, 
MAX(max$1) AS b1], changelogMode=[I,UB,UA])
-            +- Exchange(distribution=[hash[a]], changelogMode=[I])
-               +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS 
count$0, MAX(b) AS max$1], changelogMode=[I])
-                  +- Calc(select=[a, b], changelogMode=[I])
-                     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], 
changelogMode=[I])
-                        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase 
name="testTwoDistinctAggregateWithNonDistinctAgg[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=ONE_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT c, SUM(DISTINCT a), SUM(a), COUNT(DISTINCT b) FROM 
MyTable GROUP BY c]]>
@@ -1795,43 +1661,28 @@ GroupAggregate(groupBy=[c], partialFinalType=[FINAL], 
select=[c, SUM_RETRACT($f3
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
+  <TestCase 
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[
-SELECT
-  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
-   SELECT
-     a, COUNT(b) as b, MAX(b) as b1
-   FROM MyTable
-   GROUP BY a
-) GROUP BY b
-       ]]>
+      <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)], 
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
-   +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
-      +- LogicalProject(a=[$0], b=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $0)], EXPR$2=[COUNT($1)])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
-    <Resource name="optimized rel plan">
+    <Resource name="optimized exec plan">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, 
SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(sum$2) AS $f2, 
$SUM0_RETRACT(sum$3) AS $f3], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
-   +- LocalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, 
SUM_RETRACT($f2_0) AS (sum$0, count$1), $SUM0_RETRACT($f3) AS sum$2, 
$SUM0_RETRACT($f4) AS sum$3, COUNT_RETRACT(*) AS count1$4], changelogMode=[I])
-      +- GlobalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], 
select=[b, $f2, SUM_RETRACT((sum$0, count$1)) AS $f2_0, 
COUNT_RETRACT(distinct$0 count$2) AS $f3, COUNT_RETRACT(count1$3) AS $f4], 
changelogMode=[I,UB,UA,D])
-         +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I])
-            +- LocalGroupAggregate(groupBy=[b, $f2], 
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS 
count1$3, DISTINCT(b1) AS distinct$0], changelogMode=[I])
-               +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2], 
changelogMode=[I,UB,UA])
-                  +- GlobalGroupAggregate(groupBy=[a], select=[a, 
COUNT(count$0) AS b, MAX(max$1) AS b1], changelogMode=[I,UB,UA])
-                     +- Exchange(distribution=[hash[a]], changelogMode=[I])
-                        +- LocalGroupAggregate(groupBy=[a], select=[a, 
COUNT(b) AS count$0, MAX(b) AS max$1], changelogMode=[I])
-                           +- Calc(select=[a, b], changelogMode=[I])
-                              +- MiniBatchAssigner(interval=[1000ms], 
mode=[ProcTime], changelogMode=[I])
-                                 +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT(sum$0) AS $f1, $SUM0_RETRACT(sum$1) AS $f2])
++- Exchange(distribution=[hash[a]])
+   +- LocalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT($f1) AS sum$0, $SUM0_RETRACT($f2) AS sum$1, COUNT_RETRACT(*) AS 
count1$2])
+      +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], 
select=[a, COUNT(distinct$0 count$0) AS $f1, COUNT(count$1) AS $f2])
+         +- Exchange(distribution=[hash[a]])
+            +- LocalGroupAggregate(groupBy=[a], partialFinalType=[PARTIAL], 
select=[a, COUNT(distinct$0 a) AS count$0, COUNT(b) AS count$1, DISTINCT(a) AS 
distinct$0])
+               +- Calc(select=[a, b])
+                  +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+                     +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
index 3cdb045..cf188ee 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/IncrementalAggregateTest.xml
@@ -318,7 +318,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT()])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0_RETRACT(count$0) AS $f1, $SUM0_RETRACT(count1$1) AS $f2], 
indexOfCountStar=[1], changelogMode=[I,UA,D])
+GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
$SUM0(count$0) AS $f1, $SUM0(count1$1) AS $f2], changelogMode=[I,UA,D])
 +- Exchange(distribution=[hash[a]], changelogMode=[I])
    +- IncrementalGroupAggregate(partialAggGrouping=[a, $f2], 
finalAggGrouping=[a], select=[a, COUNT_RETRACT(distinct$0 count$0) AS count$0, 
COUNT_RETRACT(count1$1) AS count1$1], changelogMode=[I])
       +- Exchange(distribution=[hash[a, $f2]], changelogMode=[I])
@@ -435,45 +435,6 @@ GlobalGroupAggregate(groupBy=[a], 
partialFinalType=[FINAL], select=[a, $SUM0(cou
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSumCountWithSingleDistinctAndRetraction[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-SELECT
-  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-FROM(
-   SELECT
-     a, COUNT(b) as b, MAX(b) as b1
-   FROM MyTable
-   GROUP BY a
-) GROUP BY b
-       ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)], EXPR$2=[COUNT(DISTINCT $1)], 
EXPR$3=[COUNT()])
-+- LogicalProject(b=[$1], b1=[$2])
-   +- LogicalAggregate(group=[{0}], b=[COUNT($1)], b1=[MAX($1)])
-      +- LogicalProject(a=[$0], b=[$1])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, 
SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, 
$SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[b]], changelogMode=[I])
-   +- IncrementalGroupAggregate(partialAggGrouping=[b, $f2], 
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) 
AS count1$3], changelogMode=[I])
-      +- Exchange(distribution=[hash[b, $f2]], changelogMode=[I])
-         +- LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], 
select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 
b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0], 
changelogMode=[I])
-            +- Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2], 
changelogMode=[I,UB,UA])
-               +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) 
AS b, MAX(max$1) AS b1], changelogMode=[I,UB,UA])
-                  +- Exchange(distribution=[hash[a]], changelogMode=[I])
-                     +- LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) 
AS count$0, MAX(b) AS max$1], changelogMode=[I])
-                        +- Calc(select=[a, b], changelogMode=[I])
-                           +- MiniBatchAssigner(interval=[1000ms], 
mode=[ProcTime], changelogMode=[I])
-                              +- 
LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase 
name="testSomeColumnsBothInDistinctAggAndGroupBy[splitDistinctAggEnabled=true, 
aggPhaseEnforcer=TWO_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(DISTINCT a), COUNT(b) FROM MyTable GROUP BY a]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
index bc661d1..2f48206 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/DistinctAggregateTest.scala
@@ -170,22 +170,6 @@ class DistinctAggregateTest(
   }
 
   @Test
-  def testSumCountWithSingleDistinctAndRetraction(): Unit = {
-    val sqlQuery =
-      s"""
-         |SELECT
-         |  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-         |FROM(
-         |   SELECT
-         |     a, COUNT(b) as b, MAX(b) as b1
-         |   FROM MyTable
-         |   GROUP BY a
-         |) GROUP BY b
-       """.stripMargin
-    util.verifyRelPlan(sqlQuery, ExplainDetail.CHANGELOG_MODE)
-  }
-
-  @Test
   def testMinMaxWithRetraction(): Unit = {
     val sqlQuery =
       s"""
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
index e0042ba..804c832 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SplitAggregateITCase.scala
@@ -297,58 +297,6 @@ class SplitAggregateITCase(
   }
 
   @Test
-  def testCountWithSingleDistinctAndRetraction(): Unit = {
-    // Test for FLINK-23434. The result is incorrect, because the global agg 
on incremental agg
-    // does not handle retraction message. While if binary mode is on, the 
result is correct
-    // event without this fix. Because mini-batch with binary mode will 
compact retraction message
-    // in this case.
-    val t1 = tEnv.sqlQuery(
-      s"""
-         |SELECT
-         |  b, COUNT(DISTINCT b1), COUNT(1)
-         |FROM(
-         |   SELECT
-         |     a, COUNT(b) as b, MAX(b) as b1
-         |   FROM T
-         |   GROUP BY a
-         |) GROUP BY b
-       """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("2,2,2", "4,1,1", "8,1,1")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
-  def testSumCountWithSingleDistinctAndRetraction(): Unit = {
-    // Test for FLINK-23434. The plan and the result is incorrect, because sum 
with retraction
-    // will produce two acc values, while sum without retraction will produce 
only one acc value,
-    // the type consistent validation will fail in the 
IncrementalAggregateRule because wrong
-    // retraction flag is given.
-    val t1 = tEnv.sqlQuery(
-      s"""
-         |SELECT
-         |  b, SUM(b1), COUNT(DISTINCT b1), COUNT(1)
-         |FROM(
-         |   SELECT
-         |     a, COUNT(b) as b, MAX(b) as b1
-         |   FROM T
-         |   GROUP BY a
-         |) GROUP BY b
-       """.stripMargin)
-
-    val sink = new TestingRetractSink
-    t1.toRetractStream[Row].addSink(sink)
-    env.execute()
-
-    val expected = List("2,7,2,2", "4,6,1,1", "8,5,1,1")
-    assertEquals(expected.sorted, sink.getRetractResults.sorted)
-  }
-
-  @Test
   def testAggWithJoin(): Unit = {
     val t1 = tEnv.sqlQuery(
       s"""

Reply via email to