This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a603f2bb853 [FLINK-34229][table-runtime] Set CodeGeneratorContext of 
outer class as ancestor context when generate inner classes to avoid naming 
conflicts
a603f2bb853 is described below

commit a603f2bb8536fa75453694a77c66a76f2e33b941
Author: zoudan <[email protected]>
AuthorDate: Tue Jan 30 19:18:21 2024 +0800

    [FLINK-34229][table-runtime] Set CodeGeneratorContext of outer class as 
ancestor context when generate inner classes to avoid naming conflicts
    
    This closes #24228
---
 .../flink/table/planner/codegen/CodeGenUtils.scala |  6 ++---
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |  3 ++-
 .../codegen/sort/ComparatorCodeGenerator.scala     | 31 +++++++++++++++++++++-
 .../planner/codegen/sort/SortCodeGenerator.scala   | 17 +++++++++---
 .../batch/sql/OperatorFusionCodegenITCase.scala    | 13 +++++++++
 5 files changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 52be23ed123..312bd0e0b58 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -332,7 +332,7 @@ object CodeGenUtils {
           s"Unsupported type($t) to generate hash code," +
             s" the type($t) is not supported as a 
GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.")
       case ARRAY =>
-        val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+        val subCtx = new CodeGeneratorContext(ctx.tableConfig, 
ctx.classLoader, ctx)
         val genHash =
           HashCodeGenerator.generateArrayHash(
             subCtx,
@@ -340,7 +340,7 @@ object CodeGenUtils {
             "SubHashArray")
         genHashFunction(ctx, subCtx, genHash, term)
       case MULTISET | MAP =>
-        val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+        val subCtx = new CodeGeneratorContext(ctx.tableConfig, 
ctx.classLoader, ctx)
         val (keyType, valueType) = t match {
           case multiset: MultisetType =>
             (multiset.getElementType, new IntType())
@@ -353,7 +353,7 @@ object CodeGenUtils {
       case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
       case ROW | STRUCTURED_TYPE =>
         val fieldCount = getFieldCount(t)
-        val subCtx = new CodeGeneratorContext(ctx.tableConfig, ctx.classLoader)
+        val subCtx = new CodeGeneratorContext(ctx.tableConfig, 
ctx.classLoader, ctx)
         val genHash =
           HashCodeGenerator.generateRowHash(subCtx, t, "SubHashRow", (0 until 
fieldCount).toArray)
         genHashFunction(ctx, subCtx, genHash, term)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 35e0613bddc..8453522d45c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -901,7 +901,8 @@ object HashAggCodeGenHelper {
       ctx.tableConfig,
       ctx.classLoader,
       aggMapKeyType,
-      SortUtil.getAscendingSortSpec(Array.range(0, 
aggMapKeyType.getFieldCount)))
+      SortUtil.getAscendingSortSpec(Array.range(0, 
aggMapKeyType.getFieldCount)),
+      ctx)
     val computer = 
sortCodeGenerator.generateNormalizedKeyComputer("AggMapKeyComputer")
     val comparator = 
sortCodeGenerator.generateRecordComparator("AggMapValueComparator")
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
index a14b7376b08..d6bda014ce3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
@@ -51,9 +51,38 @@ object ComparatorCodeGenerator {
       name: String,
       inputType: RowType,
       sortSpec: SortSpec): GeneratedRecordComparator = {
+    gen(tableConfig, classLoader, name, inputType, sortSpec, null)
+  }
+
+  /**
+   * Generates a [[RecordComparator]] that can be passed to a Java compiler.
+   *
+   * @param tableConfig
+   *   Table config.
+   * @param classLoader
+   *   user ClassLoader.
+   * @param name
+   *   Class name of the function. Does not need to be unique but has to be a 
valid Java class
+   *   identifier.
+   * @param inputType
+   *   input type.
+   * @param sortSpec
+   *   sort specification.
+   * @param parentCtx
+   *   parent CodeGeneratorContext to avoid name conflicts.
+   * @return
+   *   A GeneratedRecordComparator
+   */
+  def gen(
+      tableConfig: ReadableConfig,
+      classLoader: ClassLoader,
+      name: String,
+      inputType: RowType,
+      sortSpec: SortSpec,
+      parentCtx: CodeGeneratorContext): GeneratedRecordComparator = {
     val baseClass = classOf[RecordComparator]
 
-    val ctx = new CodeGeneratorContext(tableConfig, classLoader)
+    val ctx = new CodeGeneratorContext(tableConfig, classLoader, parentCtx)
     val className = newName(ctx, name)
     val compareCode = GenerateUtils.generateRowCompare(ctx, inputType, 
sortSpec, "o1", "o2")
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
index 1369b427eef..7f7c90bedac 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/sort/SortCodeGenerator.scala
@@ -41,12 +41,23 @@ import scala.collection.mutable
  *   input type.
  * @param sortSpec
  *   sort specification.
+ * @param parentCtx
+ *   parent CodeGeneratorContext to avoid name conflicts. If the generated 
[[NormalizedKeyComputer]]
+ *   and [[RecordComparator]] will be used as inner classes, a non-null value 
must be set.
  */
 class SortCodeGenerator(
     tableConfig: ReadableConfig,
     classLoader: ClassLoader,
     val input: RowType,
-    val sortSpec: SortSpec) {
+    val sortSpec: SortSpec,
+    parentCtx: CodeGeneratorContext) {
+
+  def this(
+      tableConfig: ReadableConfig,
+      classLoader: ClassLoader,
+      input: RowType,
+      sortSpec: SortSpec) =
+    this(tableConfig, classLoader, input, sortSpec, null)
 
   private val MAX_NORMALIZED_KEY_LEN = 16
 
@@ -130,7 +141,7 @@ class SortCodeGenerator(
    *   A GeneratedNormalizedKeyComputer
    */
   def generateNormalizedKeyComputer(name: String): 
GeneratedNormalizedKeyComputer = {
-    val ctx = new CodeGeneratorContext(tableConfig, classLoader)
+    val ctx = new CodeGeneratorContext(tableConfig, classLoader, parentCtx)
     val className = newName(ctx, name)
 
     val (keyFullyDetermines, numKeyBytes) = getKeyFullyDeterminesAndBytes
@@ -386,7 +397,7 @@ class SortCodeGenerator(
    *   A GeneratedRecordComparator
    */
   def generateRecordComparator(name: String): GeneratedRecordComparator = {
-    ComparatorCodeGenerator.gen(tableConfig, classLoader, name, input, 
sortSpec)
+    ComparatorCodeGenerator.gen(tableConfig, classLoader, name, input, 
sortSpec, parentCtx)
   }
 
   def getter(t: LogicalType, index: Int): String = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
index 1211bb025b0..5782315fb4d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OperatorFusionCodegenITCase.scala
@@ -285,6 +285,19 @@ class OperatorFusionCodegenITCase extends BatchTestBase {
     )
   }
 
+  @TestTemplate
+  def testMultipleHashAgg(): Unit = {
+    checkOpFusionCodegenResult(
+      """
+        |SELECT * FROM
+        |  (SELECT a, SUM(b) as b FROM x group by a) T1
+        |  INNER JOIN
+        |  (SELECT d, SUM(e) as e FROM y group by d) T2
+        |  ON T1.a = T2.d
+        |""".stripMargin
+    )
+  }
+
   @TestTemplate
   def testGlobalHashAggWithKey2(): Unit = {
     checkOpFusionCodegenResult(

Reply via email to