This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2817dd4af9 Spark 3.4: Push down system functions by V2 filters for
rewriting DataFiles and PositionDeleteFiles (#8560)
2817dd4af9 is described below
commit 2817dd4af905b5c18eb6aaf84a36a08fa012dc55
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Sep 18 13:14:53 2023 -0700
Spark 3.4: Push down system functions by V2 filters for rewriting DataFiles
and PositionDeleteFiles (#8560)
---
.../extensions/TestRewriteDataFilesProcedure.java | 78 +++++++++++++++++++++-
.../procedures/RewriteDataFilesProcedure.java | 20 ++----
.../datasources/SparkExpressionConverter.scala | 9 +--
.../spark/actions/TestRewriteDataFilesAction.java | 28 +++++++-
4 files changed, 114 insertions(+), 21 deletions(-)
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 8013967181..2449c20ab9 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -35,6 +35,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkTableCache;
+import org.apache.iceberg.spark.SystemFunctionPushDownHelper;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
@@ -395,6 +396,38 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
}
+ @Test
+ public void testRewriteDataFilesWithFilterOnOnBucketExpression() {
+ // currently spark session catalog only resolve to v1 functions instead of
desired v2 functions
+ //
https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
+
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
+ createBucketPartitionTable();
+ // create 5 files for each partition (c2 = 'foo' and c2 = 'bar')
+ insertData(10);
+ List<Object[]> expectedRecords = currentData();
+
+ // select only 5 files for compaction (files in the partition c2 = 'bar')
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.bucket(2, c2) = 0')",
+ catalogName, tableIdent, catalogName);
+
+ assertEquals(
+ "Action should rewrite 5 data files from single matching partition"
+ + "(containing bucket(c2) = 0) and add 1 data files",
+ row(5, 1),
+ row(output.get(0)[0], output.get(0)[1]));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(4);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
+
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
+ }
+
@Test
public void testRewriteDataFilesWithInFilterOnPartitionTable() {
createPartitionTable();
@@ -480,7 +513,6 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
sql(
"CALL %s.system.rewrite_data_files(table => '%s'," + " where => 'c2
like \"%s\"')",
catalogName, tableIdent, "car%");
-
// TODO: Enable when org.apache.iceberg.spark.SparkFilters have
implementations for
// StringEndsWith & StringContains
// StringEndsWith
@@ -491,6 +523,39 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
// " where => 'c2 like \"%s\"')", catalogName, tableIdent, "%car%");
}
+ @Test
+ public void testRewriteDataFilesWithPossibleV2Filters() {
+ // currently spark session catalog only resolve to v1 functions instead of
desired v2 functions
+ //
https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2070-L2083
+
Assume.assumeFalse(catalogName.equals(SparkCatalogConfig.SPARK.catalogName()));
+
+ SystemFunctionPushDownHelper.createPartitionedTable(spark, tableName,
"id");
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.bucket(2, data) >= 0')",
+ catalogName, tableIdent, catalogName);
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.truncate(4, id) >= 1')",
+ catalogName, tableIdent, catalogName);
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.years(ts) >= 1')",
+ catalogName, tableIdent, catalogName);
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.months(ts) >= 1')",
+ catalogName, tableIdent, catalogName);
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.days(ts) >= date(\"2023-01-01\")')",
+ catalogName, tableIdent, catalogName);
+ sql(
+ "CALL %s.system.rewrite_data_files(table => '%s',"
+ + " where => '%s.system.hours(ts) >= 1')",
+ catalogName, tableIdent, catalogName);
+ }
+
@Test
public void testRewriteDataFilesWithInvalidInputs() {
createTable();
@@ -778,6 +843,17 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
}
+ private void createBucketPartitionTable() {
+ sql(
+ "CREATE TABLE %s (c1 int, c2 string, c3 string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(2, c2)) "
+ + "TBLPROPERTIES ('%s' '%s')",
+ tableName,
+ TableProperties.WRITE_DISTRIBUTION_MODE,
+ TableProperties.WRITE_DISTRIBUTION_MODE_NONE);
+ }
+
private void insertData(int filesCount) {
insertData(tableName, filesCount);
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 07e3f6232c..14246b47ac 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -24,20 +24,17 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.ExtendedParser;
-import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
-import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
@@ -103,8 +100,6 @@ class RewriteDataFilesProcedure extends BaseProcedure {
return modifyIcebergTable(
tableIdent,
table -> {
- String quotedFullIdentifier =
- Spark3Util.quotedFullIdentifier(tableCatalog().name(),
tableIdent);
RewriteDataFiles action = actions().rewriteDataFiles(table);
String strategy = args.isNullAt(1) ? null : args.getString(1);
@@ -120,7 +115,7 @@ class RewriteDataFilesProcedure extends BaseProcedure {
String where = args.isNullAt(4) ? null : args.getString(4);
- action = checkAndApplyFilter(action, where, quotedFullIdentifier);
+ action = checkAndApplyFilter(action, where, tableIdent);
RewriteDataFiles.Result result = action.execute();
@@ -129,15 +124,10 @@ class RewriteDataFilesProcedure extends BaseProcedure {
}
private RewriteDataFiles checkAndApplyFilter(
- RewriteDataFiles action, String where, String tableName) {
+ RewriteDataFiles action, String where, Identifier ident) {
if (where != null) {
- try {
- Expression expression =
- SparkExpressionConverter.collectResolvedSparkExpression(spark(),
tableName, where);
- return
action.filter(SparkExpressionConverter.convertToIcebergExpression(expression));
- } catch (AnalysisException e) {
- throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where, e);
- }
+ Expression expression = filterExpression(ident, where);
+ return action.filter(expression);
}
return action;
}
diff --git
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
index 9f53eae60a..4903a100f9 100644
---
a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
+++
b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkExpressionConverter.scala
@@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.iceberg.spark.SparkFilters
+import org.apache.iceberg.spark.SparkV2Filters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -28,14 +28,15 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
object SparkExpressionConverter {
def convertToIcebergExpression(sparkExpression: Expression):
org.apache.iceberg.expressions.Expression = {
- // Currently, it is a double conversion as we are converting Spark
expression to Spark filter
- // and then converting Spark filter to Iceberg expression.
+ // Currently, it is a double conversion as we are converting Spark
expression to Spark predicate
+ // and then converting Spark predicate to Iceberg expression.
// But these two conversions already exist and well tested. So, we are
going with this approach.
- SparkFilters.convert(DataSourceStrategy.translateFilter(sparkExpression,
supportNestedPredicatePushdown = true).get)
+
SparkV2Filters.convert(DataSourceV2Strategy.translateFilterV2(sparkExpression).get)
}
@throws[AnalysisException]
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 1581e58364..bfffa65acc 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -223,6 +223,32 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testBinPackWithFilterOnBucketExpression() {
+ Table table = createTablePartitioned(4, 2);
+
+ shouldHaveFiles(table, 8);
+ List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
+
+ Result result =
+ basicRewrite(table)
+ .filter(Expressions.equal("c1", 1))
+ .filter(Expressions.equal(Expressions.bucket("c2", 2), 0))
+ .execute();
+
+ assertThat(result)
+ .extracting(Result::rewrittenDataFilesCount,
Result::addedDataFilesCount)
+ .as("Action should rewrite 2 data files into 1 data file")
+ .contains(2, 1);
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
+
+ shouldHaveFiles(table, 7);
+
+ List<Object[]> actualRecords = currentData();
+ assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
@Test
public void testBinPackAfterPartitionChange() {
Table table = createTable();
@@ -260,7 +286,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
}
@Test
- public void testBinPackWithDeletes() throws Exception {
+ public void testBinPackWithDeletes() {
Table table = createTablePartitioned(4, 2);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
shouldHaveFiles(table, 8);