godfreyhe commented on code in PR #21586:
URL: https://github.com/apache/flink/pull/21586#discussion_r1073610534


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java:
##########
@@ -58,6 +58,7 @@ public class BatchExecHashAggregate extends 
ExecNodeBase<RowData>
     private final RowType aggInputRowType;
     private final boolean isMerge;
     private final boolean isFinal;
+    private final boolean canDoAdaptiveHashAgg;

Review Comment:
   how about `supportAdaptiveLocalHashAgg` ? 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")

Review Comment:
   Whether to enable adaptive local hash aggregation, this is only used for 
batch job. Default value is true. 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")
 
-  private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
-    AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+    key("table.exec.adaptive-local-hash-agg.sample-threshold")
+      .longType()
+      .defaultValue(Long.box(5000000L))
+      .withDescription("If adaptive local hash agg is enabled, "
+        + "the proportion of distinct value will be checked after reading this 
number of records")

Review Comment:
   If adaptive local hash aggregation is enabled, this value defines how many 
records will be used as sampled data to calculate distinct value rate (see 
`TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO`) for the local 
aggregate. The higher the sampling threshold, the more accurate the distinct 
value rate is. But as the sampling threshold increases, local aggregation is 
meaningless when the distinct values rate is low. The default values is 5000000.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")
 
-  private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
-    AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+    key("table.exec.adaptive-local-hash-agg.sample-threshold")

Review Comment:
   table.exec.local-hash-agg.adaptive.sampling-threshold



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
     new GeneratedProjection(className, code, ctx.references.toArray, 
ctx.tableConfig)
   }
 
+  /**
+   * If adaptive hash agg takes effect, and the sample result was to suppress 
local hash agg. In
+   * order to ensure that the data format transmitted downstream with doing 
local hash agg is
+   * consistent with the data format transmitted downstream without doing 
local hash agg, we need to
+   * do projection for grouping function value.
+   *
+   * <p> For example, for sql statement "select a, avg(b), count(c) from T 
group by a", if local
+   * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we 
will pass (1, 5, 1, 1) to
+   * downstream.
+   */
+  def generatedAdaptiveHashAggValueProjectionCode(

Review Comment:
   generateAdaptiveLocalHashAggProjectionCode



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
 
     HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) 
sorterTerm else null)
 
+    // Do adaptive hash aggregation
+    val outputResultForOneRowAgg = {
+      // gen code to iterating the aggregate map and output to downstream
+      val inputUnboxingCode = 
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+      val rowDataType = classOf[RowData].getCanonicalName
+      s"""
+         |   // set result and output
+         |
+         |   $reuseGroupKeyTerm =  ($rowDataType)$currentKeyTerm;
+         |   $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+         |   $inputUnboxingCode
+         |   ${outputExpr.code}
+         |   ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+         |
+       """.stripMargin
+    }
+    val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+    ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+    val (
+      distinctCountInCode,
+      totalCountIncCode,
+      adaptiveSamplePointCode,
+      adaptiveSuppressCode,
+      flushResultIfSuppressEnableCode) = {

Review Comment:
   The field names should be adapted to the configuration names



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
 
     HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) 
sorterTerm else null)
 
+    // Do adaptive hash aggregation
+    val outputResultForOneRowAgg = {
+      // gen code to iterating the aggregate map and output to downstream
+      val inputUnboxingCode = 
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+      val rowDataType = classOf[RowData].getCanonicalName
+      s"""
+         |   // set result and output
+         |
+         |   $reuseGroupKeyTerm =  ($rowDataType)$currentKeyTerm;
+         |   $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+         |   $inputUnboxingCode
+         |   ${outputExpr.code}
+         |   ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+         |
+       """.stripMargin
+    }
+    val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+    ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+    val (
+      distinctCountInCode,
+      totalCountIncCode,
+      adaptiveSamplePointCode,
+      adaptiveSuppressCode,
+      flushResultIfSuppressEnableCode) = {
+      // from these conditions we know that it must be a distinct operation
+      if (
+        !isFinal &&
+        ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+        canDoAdaptiveHashAgg
+      ) {
+        val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+        val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+        ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+        ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
+
+        val loggerTerm = CodeGenUtils.newName("LOG")
+        ctx.addReusableLogger(loggerTerm, className)
+
+        val samplePoint =
+          ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT)
+        val limitDistinctRatio =
+          
ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO)
+
+        (
+          s"$adaptiveDistinctCountTerm++;",
+          s"$adaptiveTotalCountTerm++;",
+          s"""
+             |if ($adaptiveTotalCountTerm == $samplePoint) {
+             |  $loggerTerm.info("Local hash agg checkpoint reached, sample 
point = " +
+             |    $samplePoint + ", distinct = " + $adaptiveDistinctCountTerm 
+ ", total = " +
+             |    $adaptiveTotalCountTerm + ", limit distinct ratio = " + 
$limitDistinctRatio);

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -227,7 +353,13 @@ class HashAggCodeGenerator(
          |}
        """.stripMargin
     } else {
-      s"$outputResultFromMap"
+      s"""
+         |if ($localAggSuppressedTerm) {
+         | return;
+         |} else {
+         | $outputResultFromMap
+         |}
+         |""".stripMargin

Review Comment:
         s"""
            |if (!$localAggSuppressedTerm) {
            | $outputResultFromMap
            |}
            |""".stripMargin



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortAggRule.scala:
##########
@@ -108,7 +108,8 @@ class BatchPhysicalSortAggRule
         auxGroupSet,
         aggBufferTypes,
         aggCallToAggFunction,
-        isLocalHashAgg = false)
+        isLocalHashAgg = false,
+        false)

Review Comment:
   supportAdaptiveLocalHashAgg = false



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala:
##########
@@ -17,12 +17,134 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql.agg
 
+import org.apache.flink.api.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator
+
+import org.junit.Test
 
 /** AggregateITCase using HashAgg Operator. */
 class HashAggITCase extends AggregateITCaseBase("HashAggregate") {

Review Comment:
   I think we can use `Parameterized` test to verify the different scenarios



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
     new GeneratedProjection(className, code, ctx.references.toArray, 
ctx.tableConfig)
   }
 
+  /**
+   * If adaptive hash agg takes effect, and the sample result was to suppress 
local hash agg. In
+   * order to ensure that the data format transmitted downstream with doing 
local hash agg is
+   * consistent with the data format transmitted downstream without doing 
local hash agg, we need to
+   * do projection for grouping function value.
+   *
+   * <p> For example, for sql statement "select a, avg(b), count(c) from T 
group by a", if local
+   * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we 
will pass (1, 5, 1, 1) to
+   * downstream.
+   */
+  def generatedAdaptiveHashAggValueProjectionCode(
+      ctx: CodeGeneratorContext,
+      inputType: RowType,
+      outClass: Class[_ <: RowData] = classOf[BinaryRowData],
+      inputTerm: String = DEFAULT_INPUT1_TERM,
+      aggInfos: Array[AggregateInfo],
+      outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+      outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM): String = {
+    val fieldExprs: ArrayBuffer[GeneratedExpression] = ArrayBuffer()
+    aggInfos.map {
+      aggInfo =>
+        aggInfo.function match {
+          case _: SumAggFunction =>
+            fieldExprs += GenerateUtils.generateFieldAccess(
+              ctx,
+              inputType,
+              inputTerm,
+              aggInfo.agg.getArgList.get(0))
+          case _: MaxAggFunction | _: MinAggFunction =>
+            fieldExprs += GenerateUtils.generateFieldAccess(
+              ctx,
+              inputType,
+              inputTerm,
+              aggInfo.agg.getArgList.get(0))

Review Comment:
   they can be merged into one branch



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")
 
-  private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
-    AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+    key("table.exec.adaptive-local-hash-agg.sample-threshold")
+      .longType()
+      .defaultValue(Long.box(5000000L))
+      .withDescription("If adaptive local hash agg is enabled, "
+        + "the proportion of distinct value will be checked after reading this 
number of records")
 
-  private lazy val aggBufferNames: Array[Array[String]] =
-    AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO: 
ConfigOption[JDouble] =
+    key("table.exec.adaptive-local-hash-agg.limit-distinct-ratio")

Review Comment:
   table.exec.local-hash-agg.adaptive.distinct-value-rate-threshold



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")
 
-  private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
-    AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =

Review Comment:
   TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLING_THRESHOLD



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")
+      .booleanType()
+      .defaultValue(Boolean.box(true))
+      .withDescription("Whether to enable adaptive local hash agg")
 
-  private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
-    AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+    key("table.exec.adaptive-local-hash-agg.sample-threshold")
+      .longType()
+      .defaultValue(Long.box(5000000L))
+      .withDescription("If adaptive local hash agg is enabled, "
+        + "the proportion of distinct value will be checked after reading this 
number of records")
 
-  private lazy val aggBufferNames: Array[Array[String]] =
-    AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos)
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO: 
ConfigOption[JDouble] =
+    key("table.exec.adaptive-local-hash-agg.limit-distinct-ratio")
+      .doubleType()
+      .defaultValue(0.5d)
+      .withDescription("If adaptive distinct is enabled, local aggregation 
will be suppressed " +
+        "if the ratio of distinct keys is higher than this value after sample 
point")

Review Comment:
   The distinct value rate can be defined as the number of local aggregation 
result for the sampled data divided by the sampling threshold (see 
TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLING_THRESHOLD). If the computed result 
is lower than the given configuration value, the remaining input records 
proceed to do local aggregation, otherwise the remaining input records are 
subjected to simple projection which calculation cost is less than local 
aggregation. The default value is 0.5.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -74,6 +98,9 @@ class HashAggCodeGenerator(
     // gen code to do group key projection from input
     val currentKeyTerm = CodeGenUtils.newName("currentKey")
     val currentKeyWriterTerm = CodeGenUtils.newName("currentKeyWriter")
+    // currentValueTerm and currentValueWriterTerm are used for value 
projection while canProjection is true.

Review Comment:
   while `supportAdaptiveLocalHashAgg` is true.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
  * aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash 
Aggregate performs
  * much better than Sort Aggregate).
  */
-class HashAggCodeGenerator(
-    ctx: CodeGeneratorContext,
-    builder: RelBuilder,
-    aggInfoList: AggregateInfoList,
-    inputType: RowType,
-    outputType: RowType,
-    grouping: Array[Int],
-    auxGrouping: Array[Int],
-    isMerge: Boolean,
-    isFinal: Boolean) {
+object HashAggCodeGenerator {
 
-  private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+  // It is a experimental config, will may be removed later.
+  @Experimental
+  val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+    key("table.exec.adaptive-local-hash-agg.enabled")

Review Comment:
   table.exec.local-hash-agg.adaptive.enabled



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala:
##########
@@ -611,6 +611,71 @@ object GenerateUtils {
       generateInputFieldUnboxing(ctx, inputType, inputCode, inputCode)
   }
 
+  /**
+   * Do projection for grouping function 'max(col)' or 'min(col)' if adaptive 
hash agg takes effect.
+   * 'max/min(col)' will be convert to the current col value with same type.
+   */
+  def generateFiledAccessForMaxAndMin(

Review Comment:
   useless code ?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRule.scala:
##########
@@ -80,7 +80,7 @@ class EnforceLocalSortAggRule
     val localGrouping = agg.grouping
     // create local sort
     val localSort = createSort(expand, localGrouping)
-    val localAgg = createLocalAgg(agg, localSort)
+    val localAgg = createLocalAgg(agg, localSort, false)

Review Comment:
   use named parameter



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashAggregate.scala:
##########
@@ -161,6 +161,7 @@ class BatchPhysicalHashAggregate(
       FlinkTypeFactory.toLogicalRowType(aggInputRowType),
       isMerge,
       true, // isFinal is always true
+      false,

Review Comment:
   add "// supportAdaptiveLocalHashAgg is always false"



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala:
##########
@@ -17,12 +17,134 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql.agg
 
+import org.apache.flink.api.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator
+
+import org.junit.Test
 
 /** AggregateITCase using HashAgg Operator. */
 class HashAggITCase extends AggregateITCaseBase("HashAggregate") {
 
   override def prepareAggOp(): Unit = {
     tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"SortAgg")
   }
+
+  @Test
+  def testAdaptiveHashAggWithHighAggregationDegree(): Unit = {
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED,
+      Boolean.box(true))
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT,
+      Long.box(5L))
+
+    checkQuery(
+      Seq(
+        (1, 1, 1, 1),
+        (1, 1, 1, 2),
+        (1, 1, 2, 3),
+        (1, 1, 2, 2),
+        (1, 1, 3, 3),
+        (1, 2, 1, 1),
+        (1, 2, 1, 2),
+        (1, 3, 1, 1),
+        (1, 4, 1, 1),
+        (2, 1, 2, 2),
+        (2, 2, 3, 3)),
+      "SELECT f0, f1, sum(f2), max(f3), count(f3), count(*) FROM TableName 
GROUP BY f0, f1",

Review Comment:
   add `min`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalHashAggRule.scala:
##########
@@ -69,7 +69,7 @@ class EnforceLocalHashAggRule
     val agg: BatchPhysicalHashAggregate = call.rel(0)
     val expand: BatchPhysicalExpand = call.rel(2)
 
-    val localAgg = createLocalAgg(agg, expand)
+    val localAgg = createLocalAgg(agg, expand, true)

Review Comment:
   why `supportAdaptiveLocalHashAgg ` is always true



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala:
##########
@@ -17,12 +17,134 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql.agg
 
+import org.apache.flink.api.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator
+
+import org.junit.Test
 
 /** AggregateITCase using HashAgg Operator. */
 class HashAggITCase extends AggregateITCaseBase("HashAggregate") {
 
   override def prepareAggOp(): Unit = {
     tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"SortAgg")
   }
+
+  @Test
+  def testAdaptiveHashAggWithHighAggregationDegree(): Unit = {
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED,
+      Boolean.box(true))
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT,
+      Long.box(5L))
+
+    checkQuery(
+      Seq(
+        (1, 1, 1, 1),
+        (1, 1, 1, 2),
+        (1, 1, 2, 3),
+        (1, 1, 2, 2),
+        (1, 1, 3, 3),
+        (1, 2, 1, 1),
+        (1, 2, 1, 2),
+        (1, 3, 1, 1),
+        (1, 4, 1, 1),
+        (2, 1, 2, 2),
+        (2, 2, 3, 3)),
+      "SELECT f0, f1, sum(f2), max(f3), count(f3), count(*) FROM TableName 
GROUP BY f0, f1",
+      Seq(
+        (1, 1, 9, 3, 5, 5),
+        (1, 2, 2, 2, 2, 2),
+        (1, 3, 1, 1, 1, 1),
+        (1, 4, 1, 1, 1, 1),
+        (2, 1, 2, 2, 1, 1),
+        (2, 2, 3, 3, 1, 1))
+    )
+  }
+
+  @Test
+  def testAdaptiveHashAggWithLowAggregationDegree(): Unit = {
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED,
+      Boolean.box(true))
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT,
+      Long.box(5L))
+
+    checkQuery(
+      Seq(
+        (1, 1, 1, 1),
+        (1, 1, 1, 2),
+        (1, 2, 2, 3),
+        (1, 3, 2, 2),
+        (1, 4, 3, 3),
+        (1, 5, 1, 1),
+        (2, 1, 1, 2),
+        (2, 2, 1, 1),
+        (2, 3, 1, 1),
+        (2, 3, 2, 2),
+        (2, 3, 3, 3)),
+      "SELECT f0, f1, sum(f2), max(f3) FROM TableName GROUP BY f0, f1",
+      Seq(
+        (1, 1, 2, 2),
+        (1, 2, 2, 3),
+        (1, 3, 2, 2),
+        (1, 4, 3, 3),
+        (1, 5, 1, 1),
+        (2, 1, 1, 2),
+        (2, 2, 1, 1),
+        (2, 3, 6, 3))
+    )
+  }
+
+  @Test
+  def testAdaptiveHashAggWithRowLessThanSamplePoint(): Unit = {
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED,
+      Boolean.box(true))
+    tEnv.getConfig.set(
+      HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT,
+      Long.box(5L))
+
+    checkQuery(
+      Seq((1, 1, 1, 1), (1, 1, 1, 2), (1, 2, 2, 3)),
+      "SELECT f0, f1, sum(f2), max(f3) FROM TableName GROUP BY f0, f1",
+      Seq((1, 1, 2, 2), (1, 2, 2, 3))
+    )
+  }
+
+  @Test
+  def testAdaptiveHashAggWithDifferentSumColType(): Unit = {

Review Comment:
   this can be merged into one case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to