lsyldliu commented on code in PR #21586:
URL: https://github.com/apache/flink/pull/21586#discussion_r1067180556


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala:
##########
@@ -611,6 +611,32 @@ object GenerateUtils {
       generateInputFieldUnboxing(ctx, inputType, inputCode, inputCode)
   }
 
+  def generateFieldAccessForCountCol(

Review Comment:
   generateFieldAccessForCountSpecificCol?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
     new GeneratedProjection(className, code, ctx.references.toArray, 
ctx.tableConfig)
   }
 
+  /**
+   * If adaptive hash agg takes effect, and the sample result was to suppress 
local hash agg. In
+   * order to ensure that the data format transmitted downstream with doing 
local hash agg is
+   * consistent with the data format transmitted downstream without doing 
local hash agg, we need to
+   * do projection for grouping function value.
+   *
+   * <p> For example, for sql statement "select a, avg(b), count(c) from T 
group by a", if local
+   * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we 
will pass (1, 5, 1, 1) to
+   * downstream.
+   */
+  def generatedValueProjectionCode(

Review Comment:
   generatedAdaptiveHashAggValueProjectionCode?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +206,81 @@ class HashAggCodeGenerator(
 
     HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) 
sorterTerm else null)
 
+    // Do adaptive hash aggregation
+    val outputResultForOneRowAgg = {
+      // gen code to iterating the aggregate map and output to downstream
+      val inputUnboxingCode = 
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+      val rowDataType = classOf[RowData].getCanonicalName
+      s"""
+         |   // set result and output
+         |
+         |   $reuseGroupKeyTerm =  ($rowDataType)$currentKeyTerm;
+         |   $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+         |   $inputUnboxingCode
+         |   ${outputExpr.code}
+         |   ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+         |
+       """.stripMargin
+    }
+    val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+    ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+    val (
+      distinctCountInCode,
+      totalCountIncCode,
+      adaptiveSamplePointCode,
+      adaptiveSuppressCode,
+      flushResultIfSuppressEnableCode) = {
+      // from these conditions we know that it must be a distinct operation
+      if (
+        !isFinal &&
+        ctx.tableConfig.get(
+          HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) && 
canDoAdaptiveHashAgg

Review Comment:
   Removing the `HashAggCodeGenerator.`?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +206,81 @@ class HashAggCodeGenerator(
 
     HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal) 
sorterTerm else null)
 
+    // Do adaptive hash aggregation
+    val outputResultForOneRowAgg = {
+      // gen code to iterating the aggregate map and output to downstream
+      val inputUnboxingCode = 
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+      val rowDataType = classOf[RowData].getCanonicalName
+      s"""
+         |   // set result and output
+         |
+         |   $reuseGroupKeyTerm =  ($rowDataType)$currentKeyTerm;
+         |   $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+         |   $inputUnboxingCode
+         |   ${outputExpr.code}
+         |   ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+         |
+       """.stripMargin
+    }
+    val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+    ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+    val (
+      distinctCountInCode,
+      totalCountIncCode,
+      adaptiveSamplePointCode,
+      adaptiveSuppressCode,
+      flushResultIfSuppressEnableCode) = {
+      // from these conditions we know that it must be a distinct operation
+      if (
+        !isFinal &&
+        ctx.tableConfig.get(
+          HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) && 
canDoAdaptiveHashAgg
+      ) {
+        val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+        val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+        ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+        ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
+
+        val loggerTerm = CodeGenUtils.newName("LOG")
+        ctx.addReusableLogger(loggerTerm, className)
+
+        val samplePoint =
+          
ctx.tableConfig.get(HashAggCodeGenerator.TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT)

Review Comment:
   Ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to