This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 24b972296f Spark 3.3, 3.4: Add where filter to
rewrite_position_delete_files procedure (#8289)
24b972296f is described below
commit 24b972296f89376f62011102e02536e96bcfb852
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Aug 16 17:26:09 2023 -0700
Spark 3.3, 3.4: Add where filter to rewrite_position_delete_files procedure
(#8289)
---
.../TestRewritePositionDeleteFilesProcedure.java | 70 +++++++++++++++++++---
.../iceberg/spark/procedures/BaseProcedure.java | 14 +++++
.../RewritePositionDeleteFilesProcedure.java | 17 +++++-
.../TestRewritePositionDeleteFilesProcedure.java | 70 +++++++++++++++++++---
.../iceberg/spark/procedures/BaseProcedure.java | 14 +++++
.../RewritePositionDeleteFilesProcedure.java | 17 +++++-
6 files changed, 184 insertions(+), 18 deletions(-)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 88715fd89c..0bc2bb9961 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -41,19 +41,36 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
}
private void createTable() throws Exception {
+ createTable(false);
+ }
+
+ private void createTable(boolean partitioned) throws Exception {
+ String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
sql(
- "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "CREATE TABLE %s (id bigint, data string) USING iceberg %s
TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ tableName, partitionStmt);
List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
- new SimpleRecord(2, "b"),
- new SimpleRecord(3, "c"),
- new SimpleRecord(4, "d"),
- new SimpleRecord(5, "e"),
- new SimpleRecord(6, "f"));
+ new SimpleRecord(1, "b"),
+ new SimpleRecord(1, "c"),
+ new SimpleRecord(2, "d"),
+ new SimpleRecord(2, "e"),
+ new SimpleRecord(2, "f"),
+ new SimpleRecord(3, "g"),
+ new SimpleRecord(3, "h"),
+ new SimpleRecord(3, "i"),
+ new SimpleRecord(4, "j"),
+ new SimpleRecord(4, "k"),
+ new SimpleRecord(4, "l"),
+ new SimpleRecord(5, "m"),
+ new SimpleRecord(5, "n"),
+ new SimpleRecord(5, "o"),
+ new SimpleRecord(6, "p"),
+ new SimpleRecord(6, "q"),
+ new SimpleRecord(6, "r"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
@@ -130,6 +147,45 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
output);
}
+ @Test
+ public void testExpireDeleteFilesFilter() throws Exception {
+ createTable(true);
+
+ sql("DELETE FROM %s WHERE id = 1 and data='a'", tableName);
+ sql("DELETE FROM %s WHERE id = 1 and data='b'", tableName);
+ sql("DELETE FROM %s WHERE id = 2 and data='d'", tableName);
+ sql("DELETE FROM %s WHERE id = 2 and data='e'", tableName);
+ sql("DELETE FROM %s WHERE id = 3 and data='g'", tableName);
+ sql("DELETE FROM %s WHERE id = 3 and data='h'", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals(6, TestHelpers.deleteFiles(table).size());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ // data filter is ignored as it cannot be applied to position
deletes
+ + "where => 'id IN (1, 2) AND data=\"bar\"',"
+ + "options => map("
+ + "'rewrite-all','true'))",
+ catalogName, tableIdent);
+ table.refresh();
+
+ Map<String, String> snapshotSummary = snapshotSummary();
+ assertEquals(
+ "Should delete 4 delete files and add 2",
+ ImmutableList.of(
+ row(
+ 4,
+ 2,
+ Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+ Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+ output);
+
+ Assert.assertEquals(4, TestHelpers.deleteFiles(table).size());
+ }
+
@Test
public void testInvalidOption() throws Exception {
createTable();
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index ed0156adc5..d7c01936ee 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,7 @@ import
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -44,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.execution.CacheManager;
+import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -160,6 +163,17 @@ abstract class BaseProcedure implements Procedure {
cacheManager.recacheByPlan(spark, relation);
}
+ protected Expression filterExpression(Identifier ident, String where) {
+ try {
+ String name = Spark3Util.quotedFullIdentifier(tableCatalog.name(),
ident);
+ org.apache.spark.sql.catalyst.expressions.Expression expression =
+ SparkExpressionConverter.collectResolvedSparkExpression(spark, name,
where);
+ return SparkExpressionConverter.convertToIcebergExpression(expression);
+ } catch (AnalysisException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where, e);
+ }
+ }
+
protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
index a4a3f63ba7..3d5e45ce8b 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark.procedures;
import java.util.Map;
import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -42,9 +44,11 @@ public class RewritePositionDeleteFilesProcedure extends
BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter OPTIONS_PARAM =
ProcedureParameter.optional("options", STRING_MAP);
+ private static final ProcedureParameter WHERE_PARAM =
+ ProcedureParameter.optional("where", DataTypes.StringType);
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+ new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM, WHERE_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -85,11 +89,20 @@ public class RewritePositionDeleteFilesProcedure extends
BaseProcedure {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM,
ImmutableMap.of());
+ String where = input.asString(WHERE_PARAM, null);
return modifyIcebergTable(
tableIdent,
table -> {
- Result result =
actions().rewritePositionDeletes(table).options(options).execute();
+ RewritePositionDeleteFiles action =
+ actions().rewritePositionDeletes(table).options(options);
+
+ if (where != null) {
+ Expression whereExpression = filterExpression(tableIdent, where);
+ action = action.filter(whereExpression);
+ }
+
+ Result result = action.execute();
return new InternalRow[] {toOutputRow(result)};
});
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
index 88715fd89c..0bc2bb9961 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -41,19 +41,36 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
}
private void createTable() throws Exception {
+ createTable(false);
+ }
+
+ private void createTable(boolean partitioned) throws Exception {
+ String partitionStmt = partitioned ? "PARTITIONED BY (id)" : "";
sql(
- "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "CREATE TABLE %s (id bigint, data string) USING iceberg %s
TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
- tableName);
+ tableName, partitionStmt);
List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
- new SimpleRecord(2, "b"),
- new SimpleRecord(3, "c"),
- new SimpleRecord(4, "d"),
- new SimpleRecord(5, "e"),
- new SimpleRecord(6, "f"));
+ new SimpleRecord(1, "b"),
+ new SimpleRecord(1, "c"),
+ new SimpleRecord(2, "d"),
+ new SimpleRecord(2, "e"),
+ new SimpleRecord(2, "f"),
+ new SimpleRecord(3, "g"),
+ new SimpleRecord(3, "h"),
+ new SimpleRecord(3, "i"),
+ new SimpleRecord(4, "j"),
+ new SimpleRecord(4, "k"),
+ new SimpleRecord(4, "l"),
+ new SimpleRecord(5, "m"),
+ new SimpleRecord(5, "n"),
+ new SimpleRecord(5, "o"),
+ new SimpleRecord(6, "p"),
+ new SimpleRecord(6, "q"),
+ new SimpleRecord(6, "r"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
@@ -130,6 +147,45 @@ public class TestRewritePositionDeleteFilesProcedure
extends SparkExtensionsTest
output);
}
+ @Test
+ public void testExpireDeleteFilesFilter() throws Exception {
+ createTable(true);
+
+ sql("DELETE FROM %s WHERE id = 1 and data='a'", tableName);
+ sql("DELETE FROM %s WHERE id = 1 and data='b'", tableName);
+ sql("DELETE FROM %s WHERE id = 2 and data='d'", tableName);
+ sql("DELETE FROM %s WHERE id = 2 and data='e'", tableName);
+ sql("DELETE FROM %s WHERE id = 3 and data='g'", tableName);
+ sql("DELETE FROM %s WHERE id = 3 and data='h'", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals(6, TestHelpers.deleteFiles(table).size());
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.rewrite_position_delete_files("
+ + "table => '%s',"
+ // data filter is ignored as it cannot be applied to position
deletes
+ + "where => 'id IN (1, 2) AND data=\"bar\"',"
+ + "options => map("
+ + "'rewrite-all','true'))",
+ catalogName, tableIdent);
+ table.refresh();
+
+ Map<String, String> snapshotSummary = snapshotSummary();
+ assertEquals(
+ "Should delete 4 delete files and add 2",
+ ImmutableList.of(
+ row(
+ 4,
+ 2,
+ Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+ Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+ output);
+
+ Assert.assertEquals(4, TestHelpers.deleteFiles(table).size());
+ }
+
@Test
public void testInvalidOption() throws Exception {
createTable();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index ed0156adc5..d7c01936ee 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,6 +33,7 @@ import
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -44,6 +46,7 @@ import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.execution.CacheManager;
+import org.apache.spark.sql.execution.datasources.SparkExpressionConverter;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -160,6 +163,17 @@ abstract class BaseProcedure implements Procedure {
cacheManager.recacheByPlan(spark, relation);
}
+ protected Expression filterExpression(Identifier ident, String where) {
+ try {
+ String name = Spark3Util.quotedFullIdentifier(tableCatalog.name(),
ident);
+ org.apache.spark.sql.catalyst.expressions.Expression expression =
+ SparkExpressionConverter.collectResolvedSparkExpression(spark, name,
where);
+ return SparkExpressionConverter.convertToIcebergExpression(expression);
+ } catch (AnalysisException e) {
+ throw new IllegalArgumentException("Cannot parse predicates in where
option: " + where, e);
+ }
+ }
+
protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
index a4a3f63ba7..3d5e45ce8b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
@@ -20,7 +20,9 @@ package org.apache.iceberg.spark.procedures;
import java.util.Map;
import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -42,9 +44,11 @@ public class RewritePositionDeleteFilesProcedure extends
BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter OPTIONS_PARAM =
ProcedureParameter.optional("options", STRING_MAP);
+ private static final ProcedureParameter WHERE_PARAM =
+ ProcedureParameter.optional("where", DataTypes.StringType);
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+ new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM, WHERE_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -85,11 +89,20 @@ public class RewritePositionDeleteFilesProcedure extends
BaseProcedure {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM,
ImmutableMap.of());
+ String where = input.asString(WHERE_PARAM, null);
return modifyIcebergTable(
tableIdent,
table -> {
- Result result =
actions().rewritePositionDeletes(table).options(options).execute();
+ RewritePositionDeleteFiles action =
+ actions().rewritePositionDeletes(table).options(options);
+
+ if (where != null) {
+ Expression whereExpression = filterExpression(tableIdent, where);
+ action = action.filter(whereExpression);
+ }
+
+ Result result = action.execute();
return new InternalRow[] {toOutputRow(result)};
});
}