This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 263b530502 Spark 3.5: Support Specifying spec_id in
RewriteManifestProcedure (#9242)
263b530502 is described below
commit 263b530502e5597b19b6b5e282917af8eede7600
Author: Pucheng Yang <[email protected]>
AuthorDate: Thu Dec 7 13:34:27 2023 -0800
Spark 3.5: Support Specifying spec_id in RewriteManifestProcedure (#9242)
---
.../extensions/TestRewriteManifestsProcedure.java | 35 ++++++++++++++++++++++
.../procedures/RewriteManifestsProcedure.java | 8 ++++-
2 files changed, 42 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dc22fc4b70..8e64eaad44 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -331,4 +331,39 @@ public class TestRewriteManifestsProcedure extends
SparkExtensionsTestBase {
row(1, Timestamp.valueOf("2022-01-01 10:00:00"),
Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}
+
+ @Test
+ public void testWriteManifestWithSpecId() {
+ sql(
+ "CREATE TABLE %s (id int, dt string, hr string) USING iceberg
PARTITIONED BY (dt)",
+ tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' =
'false')", tableName);
+
+ sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
+ sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
+ assertEquals(
+ "Should have 2 manifests and their partition spec id should be 0",
+ ImmutableList.of(row(0), row(0)),
+ sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc",
tableName));
+
+ sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
+ sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
+ assertEquals(
+ "Should have 3 manifests and their partition spec id should be 0 and
1",
+ ImmutableList.of(row(0), row(0), row(1)),
+ sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc",
tableName));
+
+ List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')",
catalogName, tableIdent);
+ assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)),
output);
+
+ output =
+ sql(
+ "CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
+ catalogName, tableIdent);
+ assertEquals("There should be 2 manifests rewriten",
ImmutableList.of(row(2, 1)), output);
+ assertEquals(
+ "Should have 2 manifests and their partition spec id should be 0 and
1",
+ ImmutableList.of(row(0), row(1)),
+ sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc",
tableName));
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index c8becc7e5a..e59077ae3d 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
+ ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
+ ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
};
// counts are not nullable since the action result is never null
@@ -85,6 +86,7 @@ class RewriteManifestsProcedure extends BaseProcedure {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
+ Integer specId = args.isNullAt(2) ? null : args.getInt(2);
return modifyIcebergTable(
tableIdent,
@@ -95,6 +97,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
action.option(RewriteManifestsSparkAction.USE_CACHING,
useCaching.toString());
}
+ if (specId != null) {
+ action.specId(specId);
+ }
+
RewriteManifests.Result result = action.execute();
return toOutputRows(result);