This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 818b2168ba Spark 4.0: Add procedure to compute partition stats (#13523)
818b2168ba is described below

commit 818b2168ba569df590666305456a53aad84c88a1
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jul 11 12:32:44 2025 +0530

    Spark 4.0: Add procedure to compute partition stats (#13523)
---
 .../TestComputePartitionStatsProcedure.java        | 121 ++++++++++++++++++++
 .../procedures/ComputePartitionStatsProcedure.java | 126 +++++++++++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 3 files changed, 248 insertions(+)

diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..37423fc147
--- /dev/null
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestComputePartitionStatsProcedure extends ExtensionsTestBase {
+
+  @AfterEach
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void procedureOnEmptyTable() {
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)",
+        tableName);
+    List<Object[]> result =
+        sql("CALL %s.system.compute_partition_stats('%s')", catalogName, 
tableIdent);
+    assertThat(result).isEmpty();
+  }
+
+  @TestTemplate
+  public void procedureWithPositionalArgs() throws NoSuchTableException, 
ParseException {
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)",
+        tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", 
tableName);
+    List<Object[]> output =
+        sql("CALL %s.system.compute_partition_stats('%s')", catalogName, 
tableIdent);
+    assertThat(output.get(0)).isNotEmpty();
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    assertThat(table.partitionStatisticsFiles()).hasSize(1);
+    PartitionStatisticsFile statisticsFile = 
table.partitionStatisticsFiles().get(0);
+    assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+    
assertThat(statisticsFile.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId());
+    assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+  }
+
+  @TestTemplate
+  public void procedureWithNamedArgs() throws NoSuchTableException, 
ParseException {
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)",
+        tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", 
tableName);
+    sql("ALTER TABLE %s CREATE BRANCH `b1`", tableName);
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    long branchSnapshotId = table.refs().get("b1").snapshotId();
+    sql("INSERT INTO TABLE %s VALUES (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')", 
tableName);
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.compute_partition_stats(table => '%s', snapshot_id 
=> %s)",
+            catalogName, tableIdent, branchSnapshotId);
+    table.refresh();
+    assertThat(table.partitionStatisticsFiles()).hasSize(1);
+    PartitionStatisticsFile statisticsFile = 
table.partitionStatisticsFiles().get(0);
+    assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+    // should be from the branch's snapshot instead of latest snapshot of the 
table
+    assertThat(statisticsFile.snapshotId()).isEqualTo(branchSnapshotId);
+    assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+  }
+
+  @TestTemplate
+  public void procedureWithInvalidSnapshotId() {
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
PARTITIONED BY (data)",
+        tableName);
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.compute_partition_stats(table => '%s', 
snapshot_id => 42)",
+                    catalogName, tableIdent))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Snapshot not found: 42");
+  }
+
+  @TestTemplate
+  public void procedureWithInvalidTable() {
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.compute_partition_stats(table => '%s')",
+                    catalogName, TableIdentifier.of(Namespace.of("default"), 
"abcd")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining("Couldn't load table");
+  }
+}
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..08c87013b2
--- /dev/null
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Iterator;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ComputePartitionStats;
+import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure;
+import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A procedure that computes the stats incrementally from the last snapshot 
that has partition stats
+ * file until the given snapshot (uses current snapshot if not specified) and 
writes the combined
+ * result into a {@link PartitionStatisticsFile} after merging the partition 
stats. Does a full
+ * compute if previous statistics file does not exist. Also registers the 
{@link
+ * PartitionStatisticsFile} to table metadata.
+ *
+ * @see SparkActions#computePartitionStats(Table)
+ */
+public class ComputePartitionStatsProcedure extends BaseProcedure {
+
+  static final String NAME = "compute_partition_stats";
+
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+      optionalInParameter("snapshot_id", DataTypes.LongType);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "partition_statistics_file", DataTypes.StringType, true, 
Metadata.empty())
+          });
+
+  public static ProcedureBuilder builder() {
+    return new Builder<ComputePartitionStatsProcedure>() {
+      @Override
+      protected ComputePartitionStatsProcedure doBuild() {
+        return new ComputePartitionStatsProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private ComputePartitionStatsProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public BoundProcedure bind(StructType inputType) {
+    return this;
+  }
+
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public Iterator<Scan> call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          ComputePartitionStats action = 
actions().computePartitionStats(table);
+          if (snapshotId != null) {
+            action.snapshot(snapshotId);
+          }
+
+          return asScanIterator(OUTPUT_TYPE, toOutputRows(action.execute()));
+        });
+  }
+
+  private InternalRow[] toOutputRows(ComputePartitionStats.Result result) {
+    PartitionStatisticsFile statisticsFile = result.statisticsFile();
+    if (statisticsFile != null) {
+      InternalRow row = 
newInternalRow(UTF8String.fromString(statisticsFile.path()));
+      return new InternalRow[] {row};
+    } else {
+      return new InternalRow[0];
+    }
+  }
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public String description() {
+    return "ComputePartitionStatsProcedure";
+  }
+}
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 26274e84f2..6b42a04421 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -58,6 +58,7 @@ public class SparkProcedures {
         RewritePositionDeleteFilesProcedure.NAME, 
RewritePositionDeleteFilesProcedure::builder);
     mapBuilder.put(FastForwardBranchProcedure.NAME, 
FastForwardBranchProcedure::builder);
     mapBuilder.put(ComputeTableStatsProcedure.NAME, 
ComputeTableStatsProcedure::builder);
+    mapBuilder.put(ComputePartitionStatsProcedure.NAME, 
ComputePartitionStatsProcedure::builder);
     mapBuilder.put(RewriteTablePathProcedure.NAME, 
RewriteTablePathProcedure::builder);
     return mapBuilder.build();
   }

Reply via email to