This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e9da855b94 Spark 3.5: Add procedure to compute partition stats (#13480)
e9da855b94 is described below
commit e9da855b94488e5090c9b437da62735d3eac1485
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jul 11 11:36:56 2025 +0530
Spark 3.5: Add procedure to compute partition stats (#13480)
---
.../TestComputePartitionStatsProcedure.java | 121 +++++++++++++++++++++
.../procedures/ComputePartitionStatsProcedure.java | 118 ++++++++++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
3 files changed, 240 insertions(+)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..37423fc147
--- /dev/null
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestComputePartitionStatsProcedure extends ExtensionsTestBase {
+
+ @AfterEach
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void procedureOnEmptyTable() {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ List<Object[]> result =
+ sql("CALL %s.system.compute_partition_stats('%s')", catalogName,
tableIdent);
+ assertThat(result).isEmpty();
+ }
+
+ @TestTemplate
+ public void procedureWithPositionalArgs() throws NoSuchTableException,
ParseException {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
tableName);
+ List<Object[]> output =
+ sql("CALL %s.system.compute_partition_stats('%s')", catalogName,
tableIdent);
+ assertThat(output.get(0)).isNotEmpty();
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ assertThat(table.partitionStatisticsFiles()).hasSize(1);
+ PartitionStatisticsFile statisticsFile =
table.partitionStatisticsFiles().get(0);
+ assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+
assertThat(statisticsFile.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId());
+ assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+ }
+
+ @TestTemplate
+ public void procedureWithNamedArgs() throws NoSuchTableException,
ParseException {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
tableName);
+ sql("ALTER TABLE %s CREATE BRANCH `b1`", tableName);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ long branchSnapshotId = table.refs().get("b1").snapshotId();
+ sql("INSERT INTO TABLE %s VALUES (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')",
tableName);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s', snapshot_id
=> %s)",
+ catalogName, tableIdent, branchSnapshotId);
+ table.refresh();
+ assertThat(table.partitionStatisticsFiles()).hasSize(1);
+ PartitionStatisticsFile statisticsFile =
table.partitionStatisticsFiles().get(0);
+ assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+ // should be from the branch's snapshot instead of latest snapshot of the
table
+ assertThat(statisticsFile.snapshotId()).isEqualTo(branchSnapshotId);
+ assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+ }
+
+ @TestTemplate
+ public void procedureWithInvalidSnapshotId() {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s',
snapshot_id => 42)",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Snapshot not found: 42");
+ }
+
+ @TestTemplate
+ public void procedureWithInvalidTable() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s')",
+ catalogName, TableIdentifier.of(Namespace.of("default"),
"abcd")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Couldn't load table");
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..c82cbbc216
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ComputePartitionStats;
+import org.apache.iceberg.actions.ComputePartitionStats.Result;
+import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A procedure that computes the stats incrementally from the last snapshot
that has partition stats
+ * file until the given snapshot (uses current snapshot if not specified) and
writes the combined
+ * result into a {@link PartitionStatisticsFile} after merging the partition
stats. Does a full
+ * compute if previous statistics file does not exist. Also registers the
{@link
+ * PartitionStatisticsFile} to table metadata.
+ *
+ * @see SparkActions#computePartitionStats(Table)
+ */
+public class ComputePartitionStatsProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter TABLE_PARAM =
+ ProcedureParameter.required("table", DataTypes.StringType);
+ private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+ ProcedureParameter.optional("snapshot_id", DataTypes.LongType);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField(
+ "partition_statistics_file", DataTypes.StringType, true,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<ComputePartitionStatsProcedure>() {
+ @Override
+ protected ComputePartitionStatsProcedure doBuild() {
+ return new ComputePartitionStatsProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private ComputePartitionStatsProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
+
+ return modifyIcebergTable(
+ tableIdent,
+ table -> {
+ ComputePartitionStats action =
actions().computePartitionStats(table);
+ if (snapshotId != null) {
+ action.snapshot(snapshotId);
+ }
+
+ return toOutputRows(action.execute());
+ });
+ }
+
+ private InternalRow[] toOutputRows(Result result) {
+ PartitionStatisticsFile statisticsFile = result.statisticsFile();
+ if (statisticsFile != null) {
+ InternalRow row =
newInternalRow(UTF8String.fromString(statisticsFile.path()));
+ return new InternalRow[] {row};
+ } else {
+ return new InternalRow[0];
+ }
+ }
+
+ @Override
+ public String description() {
+ return "ComputePartitionStatsProcedure";
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 3539704430..82f44996c8 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -62,6 +62,7 @@ public class SparkProcedures {
mapBuilder.put("rewrite_position_delete_files",
RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder);
+ mapBuilder.put("compute_partition_stats",
ComputePartitionStatsProcedure::builder);
mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder);
return mapBuilder.build();
}