This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 35acc4c2d [codegen] EqualiserCodeGenerator supports ARRAY<ROW> (#3023)
35acc4c2d is described below
commit 35acc4c2de17c6b36b985b73fdfe28b922971bec
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 15 18:28:04 2024 +0800
[codegen] EqualiserCodeGenerator supports ARRAY<ROW> (#3023)
---
.../paimon/codegen/EqualiserCodeGenerator.scala | 18 +++-----------
.../apache/paimon/codegen/ScalarOperatorGens.scala | 26 +++++++++++++++++++
.../flink/FullCompactionFileStoreITCase.java | 29 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 15 deletions(-)
diff --git
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
index b76f170a6..41c7427af 100644
---
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
+++
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/EqualiserCodeGenerator.scala
@@ -19,9 +19,9 @@
package org.apache.paimon.codegen
import org.apache.paimon.codegen.GenerateUtils._
-import org.apache.paimon.codegen.ScalarOperatorGens.generateEquals
+import org.apache.paimon.codegen.ScalarOperatorGens.{generateEquals,
generateRowEqualiser}
import org.apache.paimon.types.{BooleanType, DataType, RowType}
-import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
+import org.apache.paimon.types.DataTypeChecks.isCompositeType
import org.apache.paimon.types.DataTypeRoot._
import org.apache.paimon.utils.TypeUtils.isPrimitive
@@ -136,19 +136,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType]) {
if (isInternalPrimitive(fieldType)) {
("", s"$leftFieldTerm == $rightFieldTerm")
} else if (isCompositeType(fieldType)) {
- val equaliserGenerator =
- new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
- val generatedEqualiser =
equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
- val generatedEqualiserTerm =
- ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
- val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
- val equaliserTerm = newName("equaliser")
- ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm =
null;")
- ctx.addReusableInitStatement(
- s"""
- |$equaliserTerm = ($equaliserTypeTerm)
- |
$generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
- |""".stripMargin)
+ val equaliserTerm = generateRowEqualiser(ctx, fieldType)
("", s"$equaliserTerm.equals($leftFieldTerm, $rightFieldTerm)")
} else {
val left = GeneratedExpression(leftFieldTerm, leftNullTerm, "",
fieldType)
diff --git
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
index 841eea331..5dfa4bff6 100644
---
a/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
+++
b/paimon-codegen/src/main/scala/org/apache/paimon/codegen/ScalarOperatorGens.scala
@@ -21,10 +21,13 @@ package org.apache.paimon.codegen
import org.apache.paimon.codegen.GenerateUtils._
import org.apache.paimon.data.serializer.InternalMapSerializer
import org.apache.paimon.types._
+import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
import org.apache.paimon.utils.InternalRowUtils
import org.apache.paimon.utils.TypeCheckUtils._
import org.apache.paimon.utils.TypeUtils.isInteroperable
+import scala.collection.JavaConverters._
+
/**
* Utilities to generate SQL scalar operators, e.g. arithmetic operator,
compare operator, equal
* operator, etc.
@@ -78,6 +81,10 @@ object ScalarOperatorGens {
// comparable types of same type
else if (isComparable(left.resultType) && canEqual) {
generateComparison(ctx, "==", left, right, resultType)
+ } else if (isCompositeType(left.resultType) && canEqual) {
+ val equaliserTerm = generateRowEqualiser(ctx, left.resultType)
+ generateOperatorIfNotNull(ctx, resultType, left, right)(
+ (leftTerm, rightTerm) => s"$equaliserTerm.equals($leftTerm,
$rightTerm)")
}
// non comparable types
else {
@@ -95,6 +102,25 @@ object ScalarOperatorGens {
}
}
+ /** Generates [[RecordEqualiser]] code for row and return equaliser name. */
+ def generateRowEqualiser(ctx: CodeGeneratorContext, fieldType: DataType):
String = {
+ val equaliserGenerator =
+ new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
+ val generatedEqualiser =
+ equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
+ val generatedEqualiserTerm =
+ ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
+ val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
+ val equaliserTerm = newName("equaliser")
+ ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = null;")
+ ctx.addReusableInitStatement(
+ s"""
+ |$equaliserTerm = ($equaliserTypeTerm)
+ |
$generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
+ |""".stripMargin)
+ equaliserTerm
+ }
+
/** Generates comparison code for numeric types and comparable types of same
type. */
def generateComparison(
ctx: CodeGeneratorContext,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
index e23dff160..a40587782 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
@@ -164,4 +164,33 @@ public class FullCompactionFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM %s$audit_log", table))
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I",
"1", "4", "5"));
}
+
+ @Test
+ public void testRowDeduplicateWithArrayRow() throws Exception {
+ String table = "T_ARRAY_ROW";
+ tEnv.executeSql(
+ "CREATE TABLE IF NOT EXISTS "
+ + table
+ + "("
+ + "ID INT PRIMARY KEY NOT ENFORCED,\n"
+ + "NAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n"
+ + ") WITH ("
+ + "'changelog-producer'='full-compaction',"
+ + "'changelog-producer.compaction-interval' = '1s',"
+ + "'changelog-producer.row-deduplicate' = 'true')");
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+ sql("INSERT INTO %s VALUES (1, ARRAY[('a','mark1')]);", table);
+
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("+I[1, [+I[a, mark1]]]");
+
+ sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')])", table);
+
assertThat(iterator.collect(2).stream().map(Row::toString).collect(Collectors.toList()))
+ .containsExactly("-U[1, [+I[a, mark1]]]", "+U[1, [+I[b,
mark2]]]");
+
+ sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')]), (2, ARRAY[('c',
'mark3')])", table);
+
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+ .containsExactly("+I[2, [+I[c, mark3]]]");
+ }
}