This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e4ca384c6 [spark] Minimize the number of data splits need to be scaned 
when update (#3147)
e4ca384c6 is described below

commit e4ca384c69e8fbeb2112af842f9bbb1023fc8c59
Author: Yann Byron <[email protected]>
AuthorDate: Tue Apr 2 20:30:01 2024 +0800

    [spark] Minimize the number of data splits need to be scaned when update 
(#3147)
---
 .../apache/paimon/spark/SparkFilterConverter.java  |  8 ++++++++
 .../paimon/spark/procedure/CompactProcedure.java   |  3 ++-
 .../paimon/spark/PaimonBaseScanBuilder.scala       | 16 +++++++--------
 .../analysis/expressions/ExpressionHelper.scala    | 24 +++++++++++++++-------
 .../commands/DeleteFromPaimonTableCommand.scala    |  4 +---
 .../spark/commands/UpdatePaimonTableCommand.scala  |  7 ++++---
 .../paimon/spark/SparkFilterConverterTest.java     | 19 +++++++++++++++++
 7 files changed, 58 insertions(+), 23 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
index ddaedd322..b600a0275 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
@@ -71,6 +71,14 @@ public class SparkFilterConverter {
         this.builder = new PredicateBuilder(rowType);
     }
 
+    public Predicate convertIgnoreFailure(Filter filter) {
+        try {
+            return convert(filter);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     public Predicate convert(Filter filter) {
         if (filter instanceof EqualTo) {
             EqualTo eq = (EqualTo) filter;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 639633644..1c5662025 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -189,7 +189,8 @@ public class CompactProcedure extends BaseProcedure {
                     condition == null
                             ? null
                             : 
ExpressionUtils.convertConditionToPaimonPredicate(
-                                    condition, relation.output(), 
table.rowType());
+                                            condition, relation.output(), 
table.rowType(), false)
+                                    .getOrElse(null);
             switch (bucketMode) {
                 case FIXED:
                 case DYNAMIC:
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 3f49ab0f7..0efe14552 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table)
     val visitor = new PartitionPredicateVisitor(table.partitionKeys())
     filters.foreach {
       filter =>
-        try {
-          val predicate = converter.convert(filter)
+        val predicate = converter.convertIgnoreFailure(filter)
+        if (predicate == null) {
+          postScan.append(filter)
+        } else {
           pushable.append((filter, predicate))
-          if (!predicate.visit(visitor)) {
-            postScan.append(filter)
-          } else {
+          if (predicate.visit(visitor)) {
             reserved.append(filter)
-          }
-        } catch {
-          case e: UnsupportedOperationException =>
-            logWarning(e.getMessage)
+          } else {
             postScan.append(filter)
+          }
         }
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 3e09557d5..8657c70ad 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -138,18 +138,28 @@ trait ExpressionHelper extends PredicateHelper {
   def convertConditionToPaimonPredicate(
       condition: Expression,
       output: Seq[Attribute],
-      rowType: RowType): Predicate = {
+      rowType: RowType,
+      ignoreFailure: Boolean = false): Option[Predicate] = {
     val converter = new SparkFilterConverter(rowType)
     val filters = normalizeExprs(Seq(condition), output)
-      .flatMap(splitConjunctivePredicates(_).map {
+      .flatMap(splitConjunctivePredicates(_).flatMap {
         f =>
-          translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
-            throw new RuntimeException("Exec update failed:" +
-              s" cannot translate expression to source filter: $f"))
+          val filter = translateFilter(f, supportNestedPredicatePushdown = 
true)
+          if (filter.isEmpty && !ignoreFailure) {
+            throw new RuntimeException(
+              "Exec update failed:" +
+                s" cannot translate expression to source filter: $f")
+          }
+          filter
       })
       .toArray
-    val predicates = filters.map(converter.convert)
-    PredicateBuilder.and(predicates: _*)
+
+    if (filters.isEmpty) {
+      None
+    } else {
+      val predicates = filters.map(converter.convertIgnoreFailure)
+      Some(PredicateBuilder.and(predicates: _*))
+    }
   }
 }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index e4bf22d0f..84922abbc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -51,9 +51,7 @@ trait DeleteFromPaimonTableCommandBase extends 
PaimonLeafRunnableCommand with Pa
       (None, false)
     } else {
       try {
-        (
-          Some(convertConditionToPaimonPredicate(condition(), relation.output, 
table.rowType())),
-          false)
+        (convertConditionToPaimonPredicate(condition(), relation.output, 
table.rowType()), false)
       } catch {
         case NonFatal(_) =>
           (None, true)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 3bc236a86..1c9fef77c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -158,9 +158,10 @@ case class UpdatePaimonTableCommand(
 
   private def findCandidateDataSplits(): Seq[DataSplit] = {
     val snapshotReader = table.newSnapshotReader()
-    if (condition == TrueLiteral) {
-      val filter = convertConditionToPaimonPredicate(condition, 
relation.output, rowType)
-      snapshotReader.withFilter(filter)
+    if (condition != TrueLiteral) {
+      val filter =
+        convertConditionToPaimonPredicate(condition, relation.output, rowType, 
ignoreFailure = true)
+      filter.foreach(snapshotReader.withFilter)
     }
 
     snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
index 2cd1726af..3778d4575 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
 
 import org.apache.spark.sql.sources.EqualNullSafe;
 import org.apache.spark.sql.sources.EqualTo;
@@ -36,6 +37,8 @@ import org.apache.spark.sql.sources.IsNotNull;
 import org.apache.spark.sql.sources.IsNull;
 import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.StringStartsWith;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Date;
@@ -43,10 +46,13 @@ import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowableOfType;
 
 /** Test for {@link SparkFilterConverter}. */
 public class SparkFilterConverterTest {
@@ -183,4 +189,17 @@ public class SparkFilterConverterTest {
         assertThat(dateExpression).isEqualTo(rawExpression);
         assertThat(localDateExpression).isEqualTo(rawExpression);
     }
+
+    @Test
+    public void testIgnoreFailure() {
+        List<DataField> dataFields = new ArrayList<>();
+        dataFields.add(new DataField(0, "id", new IntType()));
+        dataFields.add(new DataField(1, "name", new 
VarCharType(VarCharType.MAX_LENGTH)));
+        RowType rowType = new RowType(dataFields);
+        SparkFilterConverter converter = new SparkFilterConverter(rowType);
+
+        Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
+        catchThrowableOfType(() -> converter.convert(not), 
UnsupportedOperationException.class);
+        assertThat(converter.convertIgnoreFailure(not)).isNull();
+    }
 }

Reply via email to