This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 818b2168ba Spark 4.0: Add procedure to compute partition stats (#13523)
818b2168ba is described below
commit 818b2168ba569df590666305456a53aad84c88a1
Author: Ajantha Bhat <[email protected]>
AuthorDate: Fri Jul 11 12:32:44 2025 +0530
Spark 4.0: Add procedure to compute partition stats (#13523)
---
.../TestComputePartitionStatsProcedure.java | 121 ++++++++++++++++++++
.../procedures/ComputePartitionStatsProcedure.java | 126 +++++++++++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
3 files changed, 248 insertions(+)
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..37423fc147
--- /dev/null
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestComputePartitionStatsProcedure.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestComputePartitionStatsProcedure extends ExtensionsTestBase {
+
+ @AfterEach
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void procedureOnEmptyTable() {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ List<Object[]> result =
+ sql("CALL %s.system.compute_partition_stats('%s')", catalogName,
tableIdent);
+ assertThat(result).isEmpty();
+ }
+
+ @TestTemplate
+ public void procedureWithPositionalArgs() throws NoSuchTableException,
ParseException {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
tableName);
+ List<Object[]> output =
+ sql("CALL %s.system.compute_partition_stats('%s')", catalogName,
tableIdent);
+ assertThat(output.get(0)).isNotEmpty();
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ assertThat(table.partitionStatisticsFiles()).hasSize(1);
+ PartitionStatisticsFile statisticsFile =
table.partitionStatisticsFiles().get(0);
+ assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+
assertThat(statisticsFile.snapshotId()).isEqualTo(table.currentSnapshot().snapshotId());
+ assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+ }
+
+ @TestTemplate
+ public void procedureWithNamedArgs() throws NoSuchTableException,
ParseException {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
tableName);
+ sql("ALTER TABLE %s CREATE BRANCH `b1`", tableName);
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ long branchSnapshotId = table.refs().get("b1").snapshotId();
+ sql("INSERT INTO TABLE %s VALUES (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')",
tableName);
+
+ List<Object[]> output =
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s', snapshot_id
=> %s)",
+ catalogName, tableIdent, branchSnapshotId);
+ table.refresh();
+ assertThat(table.partitionStatisticsFiles()).hasSize(1);
+ PartitionStatisticsFile statisticsFile =
table.partitionStatisticsFiles().get(0);
+ assertThat(statisticsFile.path()).isEqualTo(output.get(0)[0].toString());
+ // should be from the branch's snapshot instead of latest snapshot of the
table
+ assertThat(statisticsFile.snapshotId()).isEqualTo(branchSnapshotId);
+ assertThat(new File(statisticsFile.path().replace("file:", ""))).exists();
+ }
+
+ @TestTemplate
+ public void procedureWithInvalidSnapshotId() {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
PARTITIONED BY (data)",
+ tableName);
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s',
snapshot_id => 42)",
+ catalogName, tableIdent))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Snapshot not found: 42");
+ }
+
+ @TestTemplate
+ public void procedureWithInvalidTable() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.compute_partition_stats(table => '%s')",
+ catalogName, TableIdentifier.of(Namespace.of("default"),
"abcd")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Couldn't load table");
+ }
+}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
new file mode 100644
index 0000000000..08c87013b2
--- /dev/null
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ComputePartitionStatsProcedure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Iterator;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.ComputePartitionStats;
+import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.procedures.BoundProcedure;
+import org.apache.spark.sql.connector.catalog.procedures.ProcedureParameter;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A procedure that computes the stats incrementally from the last snapshot
that has partition stats
+ * file until the given snapshot (uses current snapshot if not specified) and
writes the combined
+ * result into a {@link PartitionStatisticsFile} after merging the partition
stats. Does a full
+ * compute if previous statistics file does not exist. Also registers the
{@link
+ * PartitionStatisticsFile} to table metadata.
+ *
+ * @see SparkActions#computePartitionStats(Table)
+ */
+public class ComputePartitionStatsProcedure extends BaseProcedure {
+
+ static final String NAME = "compute_partition_stats";
+
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+ optionalInParameter("snapshot_id", DataTypes.LongType);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField(
+ "partition_statistics_file", DataTypes.StringType, true,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<ComputePartitionStatsProcedure>() {
+ @Override
+ protected ComputePartitionStatsProcedure doBuild() {
+ return new ComputePartitionStatsProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private ComputePartitionStatsProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public BoundProcedure bind(StructType inputType) {
+ return this;
+ }
+
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public Iterator<Scan> call(InternalRow args) {
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
+
+ return modifyIcebergTable(
+ tableIdent,
+ table -> {
+ ComputePartitionStats action =
actions().computePartitionStats(table);
+ if (snapshotId != null) {
+ action.snapshot(snapshotId);
+ }
+
+ return asScanIterator(OUTPUT_TYPE, toOutputRows(action.execute()));
+ });
+ }
+
+ private InternalRow[] toOutputRows(ComputePartitionStats.Result result) {
+ PartitionStatisticsFile statisticsFile = result.statisticsFile();
+ if (statisticsFile != null) {
+ InternalRow row =
newInternalRow(UTF8String.fromString(statisticsFile.path()));
+ return new InternalRow[] {row};
+ } else {
+ return new InternalRow[0];
+ }
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public String description() {
+ return "ComputePartitionStatsProcedure";
+ }
+}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 26274e84f2..6b42a04421 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -58,6 +58,7 @@ public class SparkProcedures {
RewritePositionDeleteFilesProcedure.NAME,
RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put(FastForwardBranchProcedure.NAME,
FastForwardBranchProcedure::builder);
mapBuilder.put(ComputeTableStatsProcedure.NAME,
ComputeTableStatsProcedure::builder);
+ mapBuilder.put(ComputePartitionStatsProcedure.NAME,
ComputePartitionStatsProcedure::builder);
mapBuilder.put(RewriteTablePathProcedure.NAME,
RewriteTablePathProcedure::builder);
return mapBuilder.build();
}