This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f5f4d924b Spark 3.2, 3.3, 3.4: Support specifying spec_id in 
RewriteManifestProcedure (#9243)(#9242)
3f5f4d924b is described below

commit 3f5f4d924be94c6f06793a3667c8da95c52c4064
Author: Pucheng Yang <[email protected]>
AuthorDate: Fri Dec 8 07:20:26 2023 -0800

    Spark 3.2, 3.3, 3.4: Support specifying spec_id in RewriteManifestProcedure 
(#9243)(#9242)
---
 .../extensions/TestRewriteManifestsProcedure.java  | 35 ++++++++++++++++++++++
 .../procedures/RewriteManifestsProcedure.java      |  8 ++++-
 .../extensions/TestRewriteManifestsProcedure.java  | 35 ++++++++++++++++++++++
 .../procedures/RewriteManifestsProcedure.java      |  8 ++++-
 .../extensions/TestRewriteManifestsProcedure.java  | 35 ++++++++++++++++++++++
 .../procedures/RewriteManifestsProcedure.java      |  8 ++++-
 6 files changed, 126 insertions(+), 3 deletions(-)

diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 40625b5e34..2675c1010b 100644
--- 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -339,4 +339,39 @@ public class TestRewriteManifestsProcedure extends 
SparkExtensionsTestBase {
             row(1, Timestamp.valueOf("2022-01-01 10:00:00"), 
Date.valueOf("2022-01-01"))),
         sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
   }
+
+  @Test
+  public void testWriteManifestWithSpecId() {
+    sql(
+        "CREATE TABLE %s (id int, dt string, hr string) USING iceberg 
PARTITIONED BY (dt)",
+        tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 
'false')", tableName);
+
+    sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
+    sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0",
+        ImmutableList.of(row(0), row(0)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
+    sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 3 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", 
catalogName, tableIdent);
+    assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), 
output);
+
+    output =
+        sql(
+            "CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
+            catalogName, tableIdent);
+    assertEquals("There should be 2 manifests rewriten", 
ImmutableList.of(row(2, 1)), output);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+  }
 }
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index c8becc7e5a..e59077ae3d 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
+        ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
+        ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
       };
 
   // counts are not nullable since the action result is never null
@@ -85,6 +86,7 @@ class RewriteManifestsProcedure extends BaseProcedure {
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
+    Integer specId = args.isNullAt(2) ? null : args.getInt(2);
 
     return modifyIcebergTable(
         tableIdent,
@@ -95,6 +97,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
             action.option(RewriteManifestsSparkAction.USE_CACHING, 
useCaching.toString());
           }
 
+          if (specId != null) {
+            action.specId(specId);
+          }
+
           RewriteManifests.Result result = action.execute();
 
           return toOutputRows(result);
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 40625b5e34..2675c1010b 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -339,4 +339,39 @@ public class TestRewriteManifestsProcedure extends 
SparkExtensionsTestBase {
             row(1, Timestamp.valueOf("2022-01-01 10:00:00"), 
Date.valueOf("2022-01-01"))),
         sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
   }
+
+  @Test
+  public void testWriteManifestWithSpecId() {
+    sql(
+        "CREATE TABLE %s (id int, dt string, hr string) USING iceberg 
PARTITIONED BY (dt)",
+        tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 
'false')", tableName);
+
+    sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
+    sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0",
+        ImmutableList.of(row(0), row(0)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
+    sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 3 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", 
catalogName, tableIdent);
+    assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), 
output);
+
+    output =
+        sql(
+            "CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
+            catalogName, tableIdent);
+    assertEquals("There should be 2 manifests rewriten", 
ImmutableList.of(row(2, 1)), output);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+  }
 }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index c8becc7e5a..e59077ae3d 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
+        ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
+        ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
       };
 
   // counts are not nullable since the action result is never null
@@ -85,6 +86,7 @@ class RewriteManifestsProcedure extends BaseProcedure {
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
+    Integer specId = args.isNullAt(2) ? null : args.getInt(2);
 
     return modifyIcebergTable(
         tableIdent,
@@ -95,6 +97,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
             action.option(RewriteManifestsSparkAction.USE_CACHING, 
useCaching.toString());
           }
 
+          if (specId != null) {
+            action.specId(specId);
+          }
+
           RewriteManifests.Result result = action.execute();
 
           return toOutputRows(result);
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index dc22fc4b70..8e64eaad44 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -331,4 +331,39 @@ public class TestRewriteManifestsProcedure extends 
SparkExtensionsTestBase {
             row(1, Timestamp.valueOf("2022-01-01 10:00:00"), 
Date.valueOf("2022-01-01"))),
         sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
   }
+
+  @Test
+  public void testWriteManifestWithSpecId() {
+    sql(
+        "CREATE TABLE %s (id int, dt string, hr string) USING iceberg 
PARTITIONED BY (dt)",
+        tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 
'false')", tableName);
+
+    sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
+    sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0",
+        ImmutableList.of(row(0), row(0)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
+    sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
+    assertEquals(
+        "Should have 3 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+
+    List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", 
catalogName, tableIdent);
+    assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), 
output);
+
+    output =
+        sql(
+            "CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
+            catalogName, tableIdent);
+    assertEquals("There should be 2 manifests rewriten", 
ImmutableList.of(row(2, 1)), output);
+    assertEquals(
+        "Should have 2 manifests and their partition spec id should be 0 and 
1",
+        ImmutableList.of(row(0), row(1)),
+        sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", 
tableName));
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index c8becc7e5a..e59077ae3d 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
-        ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
+        ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
+        ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
       };
 
   // counts are not nullable since the action result is never null
@@ -85,6 +86,7 @@ class RewriteManifestsProcedure extends BaseProcedure {
   public InternalRow[] call(InternalRow args) {
     Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
     Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
+    Integer specId = args.isNullAt(2) ? null : args.getInt(2);
 
     return modifyIcebergTable(
         tableIdent,
@@ -95,6 +97,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
             action.option(RewriteManifestsSparkAction.USE_CACHING, 
useCaching.toString());
           }
 
+          if (specId != null) {
+            action.specId(specId);
+          }
+
           RewriteManifests.Result result = action.execute();
 
           return toOutputRows(result);

Reply via email to