This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e4ca384c6 [spark] Minimize the number of data splits need to be scaned
when update (#3147)
e4ca384c6 is described below
commit e4ca384c69e8fbeb2112af842f9bbb1023fc8c59
Author: Yann Byron <[email protected]>
AuthorDate: Tue Apr 2 20:30:01 2024 +0800
[spark] Minimize the number of data splits need to be scaned when update
(#3147)
---
.../apache/paimon/spark/SparkFilterConverter.java | 8 ++++++++
.../paimon/spark/procedure/CompactProcedure.java | 3 ++-
.../paimon/spark/PaimonBaseScanBuilder.scala | 16 +++++++--------
.../analysis/expressions/ExpressionHelper.scala | 24 +++++++++++++++-------
.../commands/DeleteFromPaimonTableCommand.scala | 4 +---
.../spark/commands/UpdatePaimonTableCommand.scala | 7 ++++---
.../paimon/spark/SparkFilterConverterTest.java | 19 +++++++++++++++++
7 files changed, 58 insertions(+), 23 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
index ddaedd322..b600a0275 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkFilterConverter.java
@@ -71,6 +71,14 @@ public class SparkFilterConverter {
this.builder = new PredicateBuilder(rowType);
}
+ public Predicate convertIgnoreFailure(Filter filter) {
+ try {
+ return convert(filter);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
public Predicate convert(Filter filter) {
if (filter instanceof EqualTo) {
EqualTo eq = (EqualTo) filter;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 639633644..1c5662025 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -189,7 +189,8 @@ public class CompactProcedure extends BaseProcedure {
condition == null
? null
:
ExpressionUtils.convertConditionToPaimonPredicate(
- condition, relation.output(),
table.rowType());
+ condition, relation.output(),
table.rowType(), false)
+ .getOrElse(null);
switch (bucketMode) {
case FIXED:
case DYNAMIC:
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 3f49ab0f7..0efe14552 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -60,18 +60,16 @@ abstract class PaimonBaseScanBuilder(table: Table)
val visitor = new PartitionPredicateVisitor(table.partitionKeys())
filters.foreach {
filter =>
- try {
- val predicate = converter.convert(filter)
+ val predicate = converter.convertIgnoreFailure(filter)
+ if (predicate == null) {
+ postScan.append(filter)
+ } else {
pushable.append((filter, predicate))
- if (!predicate.visit(visitor)) {
- postScan.append(filter)
- } else {
+ if (predicate.visit(visitor)) {
reserved.append(filter)
- }
- } catch {
- case e: UnsupportedOperationException =>
- logWarning(e.getMessage)
+ } else {
postScan.append(filter)
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 3e09557d5..8657c70ad 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -138,18 +138,28 @@ trait ExpressionHelper extends PredicateHelper {
def convertConditionToPaimonPredicate(
condition: Expression,
output: Seq[Attribute],
- rowType: RowType): Predicate = {
+ rowType: RowType,
+ ignoreFailure: Boolean = false): Option[Predicate] = {
val converter = new SparkFilterConverter(rowType)
val filters = normalizeExprs(Seq(condition), output)
- .flatMap(splitConjunctivePredicates(_).map {
+ .flatMap(splitConjunctivePredicates(_).flatMap {
f =>
- translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
- throw new RuntimeException("Exec update failed:" +
- s" cannot translate expression to source filter: $f"))
+ val filter = translateFilter(f, supportNestedPredicatePushdown =
true)
+ if (filter.isEmpty && !ignoreFailure) {
+ throw new RuntimeException(
+ "Exec update failed:" +
+ s" cannot translate expression to source filter: $f")
+ }
+ filter
})
.toArray
- val predicates = filters.map(converter.convert)
- PredicateBuilder.and(predicates: _*)
+
+ if (filters.isEmpty) {
+ None
+ } else {
+ val predicates = filters.map(converter.convertIgnoreFailure)
+ Some(PredicateBuilder.and(predicates: _*))
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index e4bf22d0f..84922abbc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -51,9 +51,7 @@ trait DeleteFromPaimonTableCommandBase extends
PaimonLeafRunnableCommand with Pa
(None, false)
} else {
try {
- (
- Some(convertConditionToPaimonPredicate(condition(), relation.output,
table.rowType())),
- false)
+ (convertConditionToPaimonPredicate(condition(), relation.output,
table.rowType()), false)
} catch {
case NonFatal(_) =>
(None, true)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 3bc236a86..1c9fef77c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -158,9 +158,10 @@ case class UpdatePaimonTableCommand(
private def findCandidateDataSplits(): Seq[DataSplit] = {
val snapshotReader = table.newSnapshotReader()
- if (condition == TrueLiteral) {
- val filter = convertConditionToPaimonPredicate(condition,
relation.output, rowType)
- snapshotReader.withFilter(filter)
+ if (condition != TrueLiteral) {
+ val filter =
+ convertConditionToPaimonPredicate(condition, relation.output, rowType,
ignoreFailure = true)
+ filter.foreach(snapshotReader.withFilter)
}
snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
index 2cd1726af..3778d4575 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFilterConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.types.DateType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarCharType;
import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
@@ -36,6 +37,8 @@ import org.apache.spark.sql.sources.IsNotNull;
import org.apache.spark.sql.sources.IsNull;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.LessThanOrEqual;
+import org.apache.spark.sql.sources.Not;
+import org.apache.spark.sql.sources.StringStartsWith;
import org.junit.jupiter.api.Test;
import java.sql.Date;
@@ -43,10 +46,13 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowableOfType;
/** Test for {@link SparkFilterConverter}. */
public class SparkFilterConverterTest {
@@ -183,4 +189,17 @@ public class SparkFilterConverterTest {
assertThat(dateExpression).isEqualTo(rawExpression);
assertThat(localDateExpression).isEqualTo(rawExpression);
}
+
+ @Test
+ public void testIgnoreFailure() {
+ List<DataField> dataFields = new ArrayList<>();
+ dataFields.add(new DataField(0, "id", new IntType()));
+ dataFields.add(new DataField(1, "name", new
VarCharType(VarCharType.MAX_LENGTH)));
+ RowType rowType = new RowType(dataFields);
+ SparkFilterConverter converter = new SparkFilterConverter(rowType);
+
+ Not not = Not.apply(StringStartsWith.apply("name", "paimon"));
+ catchThrowableOfType(() -> converter.convert(not),
UnsupportedOperationException.class);
+ assertThat(converter.convertIgnoreFailure(not)).isNull();
+ }
}