This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 35acc4c2d [codegen] EqualiserCodeGenerator supports ARRAY<ROW> (#3023)
35acc4c2d is described below

commit 35acc4c2de17c6b36b985b73fdfe28b922971bec
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 15 18:28:04 2024 +0800

    [codegen] EqualiserCodeGenerator supports ARRAY<ROW> (#3023)
---
 .../paimon/codegen/EqualiserCodeGenerator.scala    | 18 +++-----------
 .../apache/paimon/codegen/ScalarOperatorGens.scala | 26 +++++++++++++++++++
 .../flink/FullCompactionFileStoreITCase.java       | 29 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 15 deletions(-)

diff --git 
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
 
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
index b76f170a6..41c7427af 100644
--- 
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
+++ 
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
@@ -19,9 +19,9 @@
 package org.apache.paimon.codegen
 
 import org.apache.paimon.codegen.GenerateUtils._
-import org.apache.paimon.codegen.ScalarOperatorGens.generateEquals
+import org.apache.paimon.codegen.ScalarOperatorGens.{generateEquals, 
generateRowEqualiser}
 import org.apache.paimon.types.{BooleanType, DataType, RowType}
-import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
+import org.apache.paimon.types.DataTypeChecks.isCompositeType
 import org.apache.paimon.types.DataTypeRoot._
 import org.apache.paimon.utils.TypeUtils.isPrimitive
 
@@ -136,19 +136,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType]) {
     if (isInternalPrimitive(fieldType)) {
       ("", s"$leftFieldTerm == $rightFieldTerm")
     } else if (isCompositeType(fieldType)) {
-      val equaliserGenerator =
-        new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
-      val generatedEqualiser = 
equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
-      val generatedEqualiserTerm =
-        ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
-      val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
-      val equaliserTerm = newName("equaliser")
-      ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = 
null;")
-      ctx.addReusableInitStatement(
-        s"""
-           |$equaliserTerm = ($equaliserTypeTerm)
-           |  
$generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
-           |""".stripMargin)
+      val equaliserTerm = generateRowEqualiser(ctx, fieldType)
       ("", s"$equaliserTerm.equals($leftFieldTerm, $rightFieldTerm)")
     } else {
       val left = GeneratedExpression(leftFieldTerm, leftNullTerm, "", 
fieldType)
diff --git 
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
 
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
index 841eea331..5dfa4bff6 100644
--- 
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
+++ 
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
@@ -21,10 +21,13 @@ package org.apache.paimon.codegen
 import org.apache.paimon.codegen.GenerateUtils._
 import org.apache.paimon.data.serializer.InternalMapSerializer
 import org.apache.paimon.types._
+import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
 import org.apache.paimon.utils.InternalRowUtils
 import org.apache.paimon.utils.TypeCheckUtils._
 import org.apache.paimon.utils.TypeUtils.isInteroperable
 
+import scala.collection.JavaConverters._
+
 /**
  * Utilities to generate SQL scalar operators, e.g. arithmetic operator, 
compare operator, equal
  * operator, etc.
@@ -78,6 +81,10 @@ object ScalarOperatorGens {
     // comparable types of same type
     else if (isComparable(left.resultType) && canEqual) {
       generateComparison(ctx, "==", left, right, resultType)
+    } else if (isCompositeType(left.resultType) && canEqual) {
+      val equaliserTerm = generateRowEqualiser(ctx, left.resultType)
+      generateOperatorIfNotNull(ctx, resultType, left, right)(
+        (leftTerm, rightTerm) => s"$equaliserTerm.equals($leftTerm, 
$rightTerm)")
     }
     // non comparable types
     else {
@@ -95,6 +102,25 @@ object ScalarOperatorGens {
     }
   }
 
+  /** Generates [[RecordEqualiser]] code for row and return equaliser name. */
+  def generateRowEqualiser(ctx: CodeGeneratorContext, fieldType: DataType): 
String = {
+    val equaliserGenerator =
+      new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
+    val generatedEqualiser =
+      equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
+    val generatedEqualiserTerm =
+      ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
+    val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
+    val equaliserTerm = newName("equaliser")
+    ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = null;")
+    ctx.addReusableInitStatement(
+      s"""
+         |$equaliserTerm = ($equaliserTypeTerm)
+         |  
$generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
+         |""".stripMargin)
+    equaliserTerm
+  }
+
   /** Generates comparison code for numeric types and comparable types of same 
type. */
   def generateComparison(
       ctx: CodeGeneratorContext,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
index e23dff160..a40587782 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
@@ -164,4 +164,33 @@ public class FullCompactionFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM %s$audit_log", table))
                 .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", 
"1", "4", "5"));
     }
+
+    @Test
+    public void testRowDeduplicateWithArrayRow() throws Exception {
+        String table = "T_ARRAY_ROW";
+        tEnv.executeSql(
+                "CREATE TABLE IF NOT EXISTS "
+                        + table
+                        + "("
+                        + "ID INT PRIMARY KEY NOT ENFORCED,\n"
+                        + "NAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n"
+                        + ") WITH ("
+                        + "'changelog-producer'='full-compaction',"
+                        + "'changelog-producer.compaction-interval' = '1s',"
+                        + "'changelog-producer.row-deduplicate' = 'true')");
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+        sql("INSERT INTO %s VALUES (1, ARRAY[('a','mark1')]);", table);
+        
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("+I[1, [+I[a, mark1]]]");
+
+        sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')])", table);
+        
assertThat(iterator.collect(2).stream().map(Row::toString).collect(Collectors.toList()))
+                .containsExactly("-U[1, [+I[a, mark1]]]", "+U[1, [+I[b, 
mark2]]]");
+
+        sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')]), (2, ARRAY[('c', 
'mark3')])", table);
+        
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+                .containsExactly("+I[2, [+I[c, mark3]]]");
+    }
 }

Reply via email to