This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a603f2bb853 [FLINK-34229][table-runtime] Set CodeGeneratorContext of
outer class as ancestor context when generate inner classes to avoid naming
conflicts
a603f2bb853 is described below
commit a603f2bb8536fa75453694a77c66a76f2e33b941
Author: zoudan <[email protected]>
AuthorDate: Tue Jan 30 19:18:21 2024 +0800
[FLINK-34229][table-runtime] Set CodeGeneratorContext of outer class as
ancestor context when generate inner classes to avoid naming conflicts
This closes #24228
---
.../flink/table/planner/codegen/CodeGenUtils.scala | 6 ++---
.../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 ++-
.../codegen/sort/ComparatorCodeGenerator.scala | 31 +++++++++++++++++++++-
.../planner/codegen/sort/SortCodeGenerator.scala | 17 +++++++++---
.../batch/sql/OperatorFusionCodegenITCase.scala | 13 +++++++++
5 files changed, 62 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 52be23ed123..312bd0e0b58 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -332,7 +332,7 @@ object CodeGenUtils {
s"Unsupported type($t) to generate hash code," +
s" the type($t) is not supported as a
GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.")
case ARRAY =>
- val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+ val subCtx = new CodeGeneratorContext(ctx.tableConfig,
ctx.classLoader, ctx)
val genHash =
HashCodeGenerator.generateArrayHash(
subCtx,
@@ -340,7 +340,7 @@ object CodeGenUtils {
"SubHashArray")
genHashFunction(ctx, subCtx, genHash, term)
case MULTISET | MAP =>
- val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+ val subCtx = new CodeGeneratorContext(ctx.tableConfig,
ctx.classLoader, ctx)
val (keyType, valueType) = t match {
case multiset: MultisetType =>
(multiset.getElementType, new IntType())
@@ -353,7 +353,7 @@ object CodeGenUtils {
case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
case ROW | STRUCTURED_TYPE =>
val fieldCount = getFieldCount(t)
- val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+ val subCtx = new CodeGeneratorContext(ctx.tableConfig,
ctx.classLoader, ctx)
val genHash =
HashCodeGenerator.generateRowHash(subCtx, t, "SubHashRow", (0 until
fieldCount).toArray)
genHashFunction(ctx, subCtx, genHash, term)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 35e0613bddc..8453522d45c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -901,7 +901,8 @@ object HashAggCodeGenHelper {
ctx.tableConfig,
ctx.classLoader,
aggMapKeyType,
- SortUtil.getAscendingSortSpec(Array.range(0,
aggMapKeyType.getFieldCount)))
+ SortUtil.getAscendingSortSpec(Array.range(0,
aggMapKeyType.getFieldCount)),
+ ctx)
val computer =
sortCodeGenerator.generateNormalizedKeyComputer("AggMapKeyComputer")
val comparator =
sortCodeGenerator.generateRecordComparator("AggMapValueComparator")
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
index a14b7376b08..d6bda014ce3 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
@@ -51,9 +51,38 @@ object ComparatorCodeGenerator {
name: String,
inputType: RowType,
sortSpec: SortSpec): GeneratedRecordComparator = {
+ gen(tableConfig, classLoader, name, inputType, sortSpec, null)
+ }
+
+ /**
+ * Generates a [[RecordComparator]] that can be passed to a Java compiler.
+ *
+ * @param tableConfig
+ * Table config.
+ * @param classLoader
+ * user ClassLoader.
+ * @param name
+ * Class name of the function. Does not need to be unique but has to be a
valid Java class
+ * identifier.
+ * @param inputType
+ * input type.
+ * @param sortSpec
+ * sort specification.
+ * @param parentCtx
+ * parent CodeGeneratorContext to avoid name conflicts.
+ * @return
+ * A GeneratedRecordComparator
+ */
+ def gen(
+ tableConfig: ReadableConfig,
+ classLoader: ClassLoader,
+ name: String,
+ inputType: RowType,
+ sortSpec: SortSpec,
+ parentCtx: CodeGeneratorContext): GeneratedRecordComparator = {
val baseClass = classOf[RecordComparator]
- val ctx = new CodeGeneratorContext(tableConfig, classLoader)
+ val ctx = new CodeGeneratorContext(tableConfig, classLoader, parentCtx)
val className = newName(ctx, name)
val compareCode = GenerateUtils.generateRowCompare(ctx, inputType,
sortSpec, "o1", "o2")
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index 1369b427eef..7f7c90bedac 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -41,12 +41,23 @@ import scala.collection.mutable
* input type.
* @param sortSpec
* sort specification.
+ * @param parentCtx
+ * parent CodeGeneratorContext to avoid name conflicts. If the generated
[[NormalizedKeyComputer]]
+ * and [[RecordComparator]] will be used as inner classes, a non-null value
must be set.
*/
class SortCodeGenerator(
tableConfig: ReadableConfig,
classLoader: ClassLoader,
val input: RowType,
- val sortSpec: SortSpec) {
+ val sortSpec: SortSpec,
+ parentCtx: CodeGeneratorContext) {
+
+ def this(
+ tableConfig: ReadableConfig,
+ classLoader: ClassLoader,
+ input: RowType,
+ sortSpec: SortSpec) =
+ this(tableConfig, classLoader, input, sortSpec, null)
private val MAX_NORMALIZED_KEY_LEN = 16
@@ -130,7 +141,7 @@ class SortCodeGenerator(
* A GeneratedNormalizedKeyComputer
*/
def generateNormalizedKeyComputer(name: String):
GeneratedNormalizedKeyComputer = {
- val ctx = new CodeGeneratorContext(tableConfig, classLoader)
+ val ctx = new CodeGeneratorContext(tableConfig, classLoader, parentCtx)
val className = newName(ctx, name)
val (keyFullyDetermines, numKeyBytes) = getKeyFullyDeterminesAndBytes
@@ -386,7 +397,7 @@ class SortCodeGenerator(
* A GeneratedRecordComparator
*/
def generateRecordComparator(name: String): GeneratedRecordComparator = {
- ComparatorCodeGenerator.gen(tableConfig, classLoader, name, input,
sortSpec)
+ ComparatorCodeGenerator.gen(tableConfig, classLoader, name, input,
sortSpec, parentCtx)
}
def getter(t: LogicalType, index: Int): String = {
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
index 1211bb025b0..5782315fb4d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
@@ -285,6 +285,19 @@ class OperatorFusionCodegenITCase extends BatchTestBase {
)
}
+ @TestTemplate
+ def testMultipleHashAgg(): Unit = {
+ checkOpFusionCodegenResult(
+ """
+ |SELECT * FROM
+ | (SELECT a, SUM(b) as b FROM x group by a) T1
+ | INNER JOIN
+ | (SELECT d, SUM(e) as e FROM y group by d) T2
+ | ON T1.a = T2.d
+ |""".stripMargin
+ )
+ }
+
@TestTemplate
def testGlobalHashAggWithKey2(): Unit = {
checkOpFusionCodegenResult(