lsyldliu commented on code in PR #21586:
URL: https://github.com/apache/flink/pull/21586#discussion_r1073489490
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
* aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash
Aggregate performs
* much better than Sort Aggregate).
*/
-class HashAggCodeGenerator(
- ctx: CodeGeneratorContext,
- builder: RelBuilder,
- aggInfoList: AggregateInfoList,
- inputType: RowType,
- outputType: RowType,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- isMerge: Boolean,
- isFinal: Boolean) {
+object HashAggCodeGenerator {
- private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+ // It is a experimental config, will may be removed later.
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+ key("table.exec.adaptive-local-hash-agg.enabled")
Review Comment:
Referring to the
[FLIP-283](https://cwiki.apache.org/confluence/display/FLINK/FLIP-283%3A+Use+adaptive+batch+scheduler+as+default+scheduler+for+batch+jobs),
I think it would better to rename the option to
`table.exec.adaptive.local-hash-agg.enabled`. cc @godfreyhe
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
* aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash
Aggregate performs
* much better than Sort Aggregate).
*/
-class HashAggCodeGenerator(
- ctx: CodeGeneratorContext,
- builder: RelBuilder,
- aggInfoList: AggregateInfoList,
- inputType: RowType,
- outputType: RowType,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- isMerge: Boolean,
- isFinal: Boolean) {
+object HashAggCodeGenerator {
- private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+ // It is a experimental config, will may be removed later.
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+ key("table.exec.adaptive-local-hash-agg.enabled")
+ .booleanType()
+ .defaultValue(Boolean.box(true))
+ .withDescription("Whether to enable adaptive local hash agg")
- private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
- AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+ key("table.exec.adaptive-local-hash-agg.sample-threshold")
+ .longType()
+ .defaultValue(Long.box(5000000L))
+ .withDescription("If adaptive local hash agg is enabled, "
+ + "the proportion of distinct value will be checked after reading this
number of records")
Review Comment:
```suggestion
+ "the proportion of distinct value will be checked after reading
this number of records.")
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##########
@@ -926,6 +926,19 @@ object AggregateUtil extends Enumeration {
aggInfos.isEmpty || supportMerge
}
+ /** Return true if all aggregates can be projection in adaptive hash agg.
False otherwise. */
+ def doAllAggSupportProjection(aggCalls: Seq[AggregateCall]): Boolean = {
Review Comment:
`doAllSuppportAdaptiveHashAgg`?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
Review Comment:
```
s"""
| // set result and output
| $reuseGroupKeyTerm = ($ROW_DATA)$currentKeyTerm;
| $reuseAggBufferTerm = ($ROW_DATA)$currentValueTerm;
| $inputUnboxingCode
| ${outputExpr.code}
| ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
|
""".stripMargin
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##########
@@ -926,6 +926,19 @@ object AggregateUtil extends Enumeration {
aggInfos.isEmpty || supportMerge
}
+ /** Return true if all aggregates can be projection in adaptive hash agg.
False otherwise. */
Review Comment:
```suggestion
/** Return true if all aggregates can be projected for adaptive local hash
aggregate. False otherwise. */
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
* aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash
Aggregate performs
* much better than Sort Aggregate).
*/
-class HashAggCodeGenerator(
- ctx: CodeGeneratorContext,
- builder: RelBuilder,
- aggInfoList: AggregateInfoList,
- inputType: RowType,
- outputType: RowType,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- isMerge: Boolean,
- isFinal: Boolean) {
+object HashAggCodeGenerator {
- private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+ // It is a experimental config, will may be removed later.
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+ key("table.exec.adaptive-local-hash-agg.enabled")
+ .booleanType()
+ .defaultValue(Boolean.box(true))
+ .withDescription("Whether to enable adaptive local hash agg")
- private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
- AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
Review Comment:
```suggestion
val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_THRESHOLD:
ConfigOption[JLong] =
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
+ // from these conditions we know that it must be a distinct operation
+ if (
+ !isFinal &&
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+ canDoAdaptiveHashAgg
+ ) {
+ val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+ val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+ ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
Review Comment:
```suggestion
ctx.addReusableMember(s"private long transient
$adaptiveDistinctCountTerm = 0;")
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
+ // from these conditions we know that it must be a distinct operation
+ if (
+ !isFinal &&
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+ canDoAdaptiveHashAgg
+ ) {
+ val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+ val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+ ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+ ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
+
+ val loggerTerm = CodeGenUtils.newName("LOG")
+ ctx.addReusableLogger(loggerTerm, className)
+
+ val samplePoint =
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT)
+ val limitDistinctRatio =
+
ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO)
+
+ (
+ s"$adaptiveDistinctCountTerm++;",
+ s"$adaptiveTotalCountTerm++;",
+ s"""
+ |if ($adaptiveTotalCountTerm == $samplePoint) {
+ | $loggerTerm.info("Local hash agg checkpoint reached, sample
point = " +
+ | $samplePoint + ", distinct = " + $adaptiveDistinctCountTerm
+ ", total = " +
+ | $adaptiveTotalCountTerm + ", limit distinct ratio = " +
$limitDistinctRatio);
+ | if ((double) $adaptiveDistinctCountTerm /
$adaptiveTotalCountTerm > $limitDistinctRatio) {
+ | $loggerTerm.info("Local hash agg suppressed");
Review Comment:
```suggestion
| $loggerTerm.info("Local hash agg is suppressed");
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
Review Comment:
`adaptiveHashAggCode` maybe better.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
new GeneratedProjection(className, code, ctx.references.toArray,
ctx.tableConfig)
}
+ /**
+ * If adaptive hash agg takes effect, and the sample result was to suppress
local hash agg. In
+ * order to ensure that the data format transmitted downstream with doing
local hash agg is
+ * consistent with the data format transmitted downstream without doing
local hash agg, we need to
+ * do projection for grouping function value.
+ *
+ * <p> For example, for sql statement "select a, avg(b), count(c) from T
group by a", if local
+ * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we
will pass (1, 5, 1, 1) to
+ * downstream.
+ */
+ def generatedAdaptiveHashAggValueProjectionCode(
+ ctx: CodeGeneratorContext,
+ inputType: RowType,
+ outClass: Class[_ <: RowData] = classOf[BinaryRowData],
+ inputTerm: String = DEFAULT_INPUT1_TERM,
+ aggInfos: Array[AggregateInfo],
+ outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+ outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM): String = {
+ val fieldExprs: ArrayBuffer[GeneratedExpression] = ArrayBuffer()
+ aggInfos.map {
+ aggInfo =>
+ aggInfo.function match {
+ case _: SumAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: MaxAggFunction | _: MinAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: AvgAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ fieldExprs += GenerateUtils.generateFieldAccessForCountSpecificCol(
+ ctx,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: CountAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccessForCountSpecificCol(
+ ctx,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: Count1AggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccessForCountOne(ctx)
+ }
+ }
+
+ val binaryRowWriter = CodeGenUtils.className[BinaryRowWriter]
+ val typeTerm = outClass.getCanonicalName
+ ctx.addReusableMember(s"$typeTerm $outRecordTerm= new
$typeTerm(${fieldExprs.size});")
+ ctx.addReusableMember(
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
new GeneratedProjection(className, code, ctx.references.toArray,
ctx.tableConfig)
}
+ /**
+ * If adaptive hash agg takes effect, and the sample result was to suppress
local hash agg. In
+ * order to ensure that the data format transmitted downstream with doing
local hash agg is
+ * consistent with the data format transmitted downstream without doing
local hash agg, we need to
+ * do projection for grouping function value.
+ *
+ * <p> For example, for sql statement "select a, avg(b), count(c) from T
group by a", if local
+ * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we
will pass (1, 5, 1, 1) to
+ * downstream.
+ */
+ def generatedAdaptiveHashAggValueProjectionCode(
+ ctx: CodeGeneratorContext,
+ inputType: RowType,
+ outClass: Class[_ <: RowData] = classOf[BinaryRowData],
+ inputTerm: String = DEFAULT_INPUT1_TERM,
+ aggInfos: Array[AggregateInfo],
+ outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+ outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM): String = {
+ val fieldExprs: ArrayBuffer[GeneratedExpression] = ArrayBuffer()
+ aggInfos.map {
+ aggInfo =>
+ aggInfo.function match {
+ case _: SumAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
Review Comment:
Assuming the sum field type is bigint, if some element of the field value is
null, we emit the result to downstream global agg is default value -1L, whether
the final is corrected?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
+ // from these conditions we know that it must be a distinct operation
+ if (
+ !isFinal &&
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+ canDoAdaptiveHashAgg
+ ) {
+ val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+ val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+ ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+ ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
+
+ val loggerTerm = CodeGenUtils.newName("LOG")
Review Comment:
We don't need to redefine this LOG variable again, the line 95 has defined
it.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
Review Comment:
```suggestion
ctx.addReusableMember(s"private boolean transient
$localAggSuppressedTerm = false;")
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
* aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash
Aggregate performs
* much better than Sort Aggregate).
*/
-class HashAggCodeGenerator(
- ctx: CodeGeneratorContext,
- builder: RelBuilder,
- aggInfoList: AggregateInfoList,
- inputType: RowType,
- outputType: RowType,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- isMerge: Boolean,
- isFinal: Boolean) {
+object HashAggCodeGenerator {
- private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+ // It is a experimental config, will may be removed later.
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+ key("table.exec.adaptive-local-hash-agg.enabled")
+ .booleanType()
+ .defaultValue(Boolean.box(true))
+ .withDescription("Whether to enable adaptive local hash agg")
- private lazy val functionIdentifiers: Map[AggregateFunction[_, _], String] =
- AggCodeGenHelper.getFunctionIdentifiers(aggInfos)
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT: ConfigOption[JLong] =
+ key("table.exec.adaptive-local-hash-agg.sample-threshold")
+ .longType()
+ .defaultValue(Long.box(5000000L))
+ .withDescription("If adaptive local hash agg is enabled, "
+ + "the proportion of distinct value will be checked after reading this
number of records")
- private lazy val aggBufferNames: Array[Array[String]] =
- AggCodeGenHelper.getAggBufferNames(auxGrouping, aggInfos)
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO:
ConfigOption[JDouble] =
+ key("table.exec.adaptive-local-hash-agg.limit-distinct-ratio")
Review Comment:
How do we understand the `limit-distinct-ration`? what does it mean?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java:
##########
@@ -58,6 +58,7 @@ public class BatchExecHashAggregate extends
ExecNodeBase<RowData>
private final RowType aggInputRowType;
private final boolean isMerge;
private final boolean isFinal;
+ private final boolean canDoAdaptiveHashAgg;
Review Comment:
`supportAdaptiveHashAgg`?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -74,6 +98,9 @@ class HashAggCodeGenerator(
// gen code to do group key projection from input
val currentKeyTerm = CodeGenUtils.newName("currentKey")
val currentKeyWriterTerm = CodeGenUtils.newName("currentKeyWriter")
+ // currentValueTerm and currentValueWriterTerm are used for value
projection while canProjection is true.
Review Comment:
```suggestion
// currentValueTerm and currentValueWriterTerm are used for value
projection while canDoAdaptiveHashAgg is true.
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -38,32 +42,52 @@ import org.apache.calcite.tools.RelBuilder
* aggregateBuffers should be update(e.g.: setInt) in [[BinaryRowData]]. (Hash
Aggregate performs
* much better than Sort Aggregate).
*/
-class HashAggCodeGenerator(
- ctx: CodeGeneratorContext,
- builder: RelBuilder,
- aggInfoList: AggregateInfoList,
- inputType: RowType,
- outputType: RowType,
- grouping: Array[Int],
- auxGrouping: Array[Int],
- isMerge: Boolean,
- isFinal: Boolean) {
+object HashAggCodeGenerator {
- private lazy val aggInfos: Array[AggregateInfo] = aggInfoList.aggInfos
+ // It is a experimental config, will may be removed later.
+ @Experimental
+ val TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED: ConfigOption[JBoolean] =
+ key("table.exec.adaptive-local-hash-agg.enabled")
+ .booleanType()
+ .defaultValue(Boolean.box(true))
+ .withDescription("Whether to enable adaptive local hash agg")
Review Comment:
```suggestion
.withDescription("Whether to enable adaptive local hash aggregation.")
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
Review Comment:
`isLocalAggSuppressed`?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
+ // from these conditions we know that it must be a distinct operation
+ if (
+ !isFinal &&
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+ canDoAdaptiveHashAgg
+ ) {
+ val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+ val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+ ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+ ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
Review Comment:
ditto
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
Review Comment:
```suggestion
flushResultSuppressEnableCode) = {
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
Review Comment:
`outResultForAdaptiveHashAgg` maybe more clear?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
Review Comment:
```suggestion
distinctCountIncCode,
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala:
##########
@@ -173,6 +214,81 @@ class HashAggCodeGenerator(
HashAggCodeGenHelper.prepareMetrics(ctx, aggregateMapTerm, if (isFinal)
sorterTerm else null)
+ // Do adaptive hash aggregation
+ val outputResultForOneRowAgg = {
+ // gen code to iterating the aggregate map and output to downstream
+ val inputUnboxingCode =
s"${ctx.reuseInputUnboxingCode(reuseAggBufferTerm)}"
+ val rowDataType = classOf[RowData].getCanonicalName
+ s"""
+ | // set result and output
+ |
+ | $reuseGroupKeyTerm = ($rowDataType)$currentKeyTerm;
+ | $reuseAggBufferTerm = ($rowDataType)$currentValueTerm;
+ | $inputUnboxingCode
+ | ${outputExpr.code}
+ | ${OperatorCodeGenerator.generateCollect(outputExpr.resultTerm)}
+ |
+ """.stripMargin
+ }
+ val localAggSuppressedTerm = CodeGenUtils.newName("localAggSuppressed")
+ ctx.addReusableMember(s"boolean $localAggSuppressedTerm = false;")
+ val (
+ distinctCountInCode,
+ totalCountIncCode,
+ adaptiveSamplePointCode,
+ adaptiveSuppressCode,
+ flushResultIfSuppressEnableCode) = {
+ // from these conditions we know that it must be a distinct operation
+ if (
+ !isFinal &&
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_ENABLED) &&
+ canDoAdaptiveHashAgg
+ ) {
+ val adaptiveDistinctCountTerm = CodeGenUtils.newName("distinctCount")
+ val adaptiveTotalCountTerm = CodeGenUtils.newName("totalCount")
+ ctx.addReusableMember(s"long $adaptiveDistinctCountTerm = 0;")
+ ctx.addReusableMember(s"long $adaptiveTotalCountTerm = 0;")
+
+ val loggerTerm = CodeGenUtils.newName("LOG")
+ ctx.addReusableLogger(loggerTerm, className)
+
+ val samplePoint =
+ ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_SAMPLE_POINT)
+ val limitDistinctRatio =
+
ctx.tableConfig.get(TABLE_EXEC_ADAPTIVE_LOCAL_HASH_AGG_LIMIT_DISTINCT_RATIO)
+
+ (
+ s"$adaptiveDistinctCountTerm++;",
+ s"$adaptiveTotalCountTerm++;",
+ s"""
+ |if ($adaptiveTotalCountTerm == $samplePoint) {
+ | $loggerTerm.info("Local hash agg checkpoint reached, sample
point = " +
+ | $samplePoint + ", distinct = " + $adaptiveDistinctCountTerm
+ ", total = " +
+ | $adaptiveTotalCountTerm + ", limit distinct ratio = " +
$limitDistinctRatio);
+ | if ((double) $adaptiveDistinctCountTerm /
$adaptiveTotalCountTerm > $limitDistinctRatio) {
Review Comment:
```suggestion
| if ($adaptiveDistinctCountTerm / (1.0 *
$adaptiveTotalCountTerm) > $limitDistinctRatio) {
```
We can multiply 1.0 directly instead of casting the result to double
forcibly.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala:
##########
@@ -611,6 +611,71 @@ object GenerateUtils {
generateInputFieldUnboxing(ctx, inputType, inputCode, inputCode)
}
+ /**
+ * Do projection for grouping function 'max(col)' or 'min(col)' if adaptive
hash agg takes effect.
+ * 'max/min(col)' will be convert to the current col value with same type.
+ */
+ def generateFiledAccessForMaxAndMin(
+ ctx: CodeGeneratorContext,
+ inputType: LogicalType,
+ inputTerm: String,
+ index: Int): GeneratedExpression = {
+ val fieldType = getFieldTypes(inputType).get(index)
+ val resultTypeTerm = primitiveTypeTermForType(fieldType)
+ val defaultValue = primitiveDefaultValue(fieldType)
+ val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
+ val Seq(fieldTerm, nullTerm) =
+ ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean",
"isNull"))
+
+ val inputCode =
+ s"""
+ |$nullTerm = $inputTerm.isNullAt($index);
+ |$fieldTerm = $defaultValue;
+ |if (!$nullTerm) {
+ | $fieldTerm = $readCode;
+ |}
+ """.stripMargin.trim
+
+ GeneratedExpression(fieldTerm, nullTerm, inputCode, fieldType)
+ }
+
+ /**
+ * Do projection for grouping function 'count(col)' if adaptive hash agg
takes effect.
+ * 'Count(col)' will be convert to 1L if col is not null and convert to 0L
if col is null.
+ */
+ def generateFieldAccessForCountSpecificCol(
+ ctx: CodeGeneratorContext,
+ inputTerm: String,
+ index: Int): GeneratedExpression = {
+ val Seq(fieldTerm, nullTerm) =
+ ctx.addReusableLocalVariables(("long", "field"), ("boolean", "isNull"))
+
+ val inputCode =
+ s"""
+ |$nullTerm = $inputTerm.isNullAt($index);
+ |$fieldTerm = 0L;
+ |if (!$nullTerm) {
+ | $fieldTerm = 1L;
+ |}
+ """.stripMargin.trim
+
+ GeneratedExpression(fieldTerm, nullTerm, inputCode, new BigIntType())
+ }
+
+ /**
+ * Do projection for grouping function 'count(*)' or 'count()' if adaptive
hash agg takes effect.
Review Comment:
```suggestion
* Do projection for grouping function 'count(*)' or 'count(1)' if
adaptive hash agg takes effect.
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
new GeneratedProjection(className, code, ctx.references.toArray,
ctx.tableConfig)
}
+ /**
+ * If adaptive hash agg takes effect, and the sample result was to suppress
local hash agg. In
Review Comment:
```suggestion
* If adaptive hash agg takes effect, local hash agg is suppressed. In
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala:
##########
@@ -611,6 +611,71 @@ object GenerateUtils {
generateInputFieldUnboxing(ctx, inputType, inputCode, inputCode)
}
+ /**
+ * Do projection for grouping function 'max(col)' or 'min(col)' if adaptive
hash agg takes effect.
+ * 'max/min(col)' will be convert to the current col value with same type.
+ */
+ def generateFiledAccessForMaxAndMin(
Review Comment:
No one uses it now, drop this method.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
new GeneratedProjection(className, code, ctx.references.toArray,
ctx.tableConfig)
}
+ /**
+ * If adaptive hash agg takes effect, and the sample result was to suppress
local hash agg. In
+ * order to ensure that the data format transmitted downstream with doing
local hash agg is
Review Comment:
```suggestion
* order to ensure that the data structure transmitted downstream with
doing local hash agg is
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##########
@@ -124,6 +129,93 @@ object ProjectionCodeGenerator {
new GeneratedProjection(className, code, ctx.references.toArray,
ctx.tableConfig)
}
+ /**
+ * If adaptive hash agg takes effect, and the sample result was to suppress
local hash agg. In
+ * order to ensure that the data format transmitted downstream with doing
local hash agg is
+ * consistent with the data format transmitted downstream without doing
local hash agg, we need to
+ * do projection for grouping function value.
+ *
+ * <p> For example, for sql statement "select a, avg(b), count(c) from T
group by a", if local
+ * hash agg suppressed and a row (1, 5, "a") comes to local hash agg, we
will pass (1, 5, 1, 1) to
+ * downstream.
+ */
+ def generatedAdaptiveHashAggValueProjectionCode(
+ ctx: CodeGeneratorContext,
+ inputType: RowType,
+ outClass: Class[_ <: RowData] = classOf[BinaryRowData],
+ inputTerm: String = DEFAULT_INPUT1_TERM,
+ aggInfos: Array[AggregateInfo],
+ outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
+ outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM): String = {
+ val fieldExprs: ArrayBuffer[GeneratedExpression] = ArrayBuffer()
+ aggInfos.map {
+ aggInfo =>
+ aggInfo.function match {
+ case _: SumAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: MaxAggFunction | _: MinAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: AvgAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccess(
+ ctx,
+ inputType,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ fieldExprs += GenerateUtils.generateFieldAccessForCountSpecificCol(
+ ctx,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: CountAggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccessForCountSpecificCol(
+ ctx,
+ inputTerm,
+ aggInfo.agg.getArgList.get(0))
+ case _: Count1AggFunction =>
+ fieldExprs += GenerateUtils.generateFieldAccessForCountOne(ctx)
+ }
+ }
+
+ val binaryRowWriter = CodeGenUtils.className[BinaryRowWriter]
+ val typeTerm = outClass.getCanonicalName
+ ctx.addReusableMember(s"$typeTerm $outRecordTerm= new
$typeTerm(${fieldExprs.size});")
Review Comment:
```suggestion
ctx.addReusableMember(s"private $typeTerm $outRecordTerm= new
$typeTerm(${fieldExprs.size});")
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala:
##########
@@ -17,12 +17,134 @@
*/
package org.apache.flink.table.planner.runtime.batch.sql.agg
+import org.apache.flink.api.scala._
import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenerator
+
+import org.junit.Test
/** AggregateITCase using HashAgg Operator. */
class HashAggITCase extends AggregateITCaseBase("HashAggregate") {
override def prepareAggOp(): Unit = {
tEnv.getConfig.set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"SortAgg")
}
+
+ @Test
+ def testAdaptiveHashAggWithHighAggregationDegree(): Unit = {
Review Comment:
I think we should cover the case that some element value is null of agg
field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]