This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 51d548a4fc API, Core: Scan API for partition stats (#14640)
51d548a4fc is described below
commit 51d548a4fc101df3494e17a5f7ff5c615c5979cc
Author: gaborkaszab <[email protected]>
AuthorDate: Wed Jan 7 13:58:23 2026 +0100
API, Core: Scan API for partition stats (#14640)
---
.../org/apache/iceberg/PartitionStatistics.java | 65 +++
.../apache/iceberg/PartitionStatisticsScan.java | 59 +++
api/src/main/java/org/apache/iceberg/Table.java | 12 +
.../apache/iceberg/BasePartitionStatistics.java | 201 +++++++++
.../iceberg/BasePartitionStatisticsScan.java | 86 ++++
.../main/java/org/apache/iceberg/BaseTable.java | 5 +
.../java/org/apache/iceberg/PartitionStats.java | 6 +
.../org/apache/iceberg/PartitionStatsHandler.java | 2 +
.../iceberg/PartitionStatisticsScanTestBase.java | 480 +++++++++++++++++++++
.../iceberg/PartitionStatisticsTestBase.java | 114 +++++
.../iceberg/PartitionStatsHandlerTestBase.java | 112 +----
.../avro/TestAvroPartitionStatisticsScan.java | 29 ++
.../orc/TestOrcPartitionStatisticsScan.java | 59 +++
.../TestParquetPartitionStatisticsScan.java | 30 ++
14 files changed, 1164 insertions(+), 96 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
new file mode 100644
index 0000000000..10df7303d5
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** Interface for partition statistics returned from a {@link
PartitionStatisticsScan}. */
+public interface PartitionStatistics extends StructLike {
+
+ /** Returns the partition of these partition statistics */
+ StructLike partition();
+
+ /** Returns the spec ID of the partition of these partition statistics */
+ Integer specId();
+
+ /** Returns the number of data records in the partition */
+ Long dataRecordCount();
+
+ /** Returns the number of data files in the partition */
+ Integer dataFileCount();
+
+ /** Returns the total size of data files in bytes in the partition */
+ Long totalDataFileSizeInBytes();
+
+ /**
+ * Returns the number of positional delete records in the partition. Also
includes dv record count
+ * as per spec
+ */
+ Long positionDeleteRecordCount();
+
+ /** Returns the number of positional delete files in the partition */
+ Integer positionDeleteFileCount();
+
+ /** Returns the number of equality delete records in the partition */
+ Long equalityDeleteRecordCount();
+
+ /** Returns the number of equality delete files in the partition */
+ Integer equalityDeleteFileCount();
+
+ /** Returns the total number of records in the partition */
+ Long totalRecords();
+
+ /** Returns the timestamp in milliseconds when the partition was last
updated */
+ Long lastUpdatedAt();
+
+ /** Returns the ID of the snapshot that last updated this partition */
+ Long lastUpdatedSnapshotId();
+
+ /** Returns the number of delete vectors in the partition */
+ Integer dvCount();
+}
diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
new file mode 100644
index 0000000000..18d8b20318
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+/** API for configuring partition statistics scan. */
+public interface PartitionStatisticsScan {
+
+ /**
+ * Create a new scan from this scan's configuration that will use the given
snapshot by ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return a new scan based on this with the given snapshot ID
+ * @throws IllegalArgumentException if the snapshot cannot be found
+ */
+ PartitionStatisticsScan useSnapshot(long snapshotId);
+
+ /**
+ * Create a new scan from the results of this, where partitions are filtered
by the {@link
+ * Expression}.
+ *
+ * @param filter a filter expression
+ * @return a new scan based on this with results filtered by the expression
+ */
+ PartitionStatisticsScan filter(Expression filter);
+
+ /**
+ * Create a new scan from this with the schema as its projection.
+ *
+ * @param schema a projection schema
+ * @return a new scan based on this with the given projection
+ */
+ PartitionStatisticsScan project(Schema schema);
+
+ /**
+ * Scans a partition statistics file belonging to a particular snapshot
+ *
+ * @return an Iterable of partition statistics
+ */
+ CloseableIterable<PartitionStatistics> scan();
+}
diff --git a/api/src/main/java/org/apache/iceberg/Table.java
b/api/src/main/java/org/apache/iceberg/Table.java
index 97ea9ba765..3c0689e892 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -83,6 +83,18 @@ public interface Table {
throw new UnsupportedOperationException("Incremental changelog scan is not
supported");
}
+ /**
+ * Create a new {@link PartitionStatisticsScan} for this table.
+ *
+ * <p>Once a partition statistics scan is created, it can be refined to
project columns and filter
+ * data.
+ *
+ * @return a partition statistics scan for this table
+ */
+ default PartitionStatisticsScan newPartitionStatisticsScan() {
+ throw new UnsupportedOperationException("Partition statistics scan is not
supported");
+ }
+
/**
* Return the {@link Schema schema} for this table.
*
diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
new file mode 100644
index 0000000000..c17718281b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatistics.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.avro.SupportsIndexProjection;
+import org.apache.iceberg.types.Types;
+
+public class BasePartitionStatistics extends SupportsIndexProjection
+ implements PartitionStatistics {
+
+ private StructLike partition;
+ private Integer specId;
+ private Long dataRecordCount;
+ private Integer dataFileCount;
+ private Long totalDataFileSizeInBytes;
+ private Long positionDeleteRecordCount;
+ private Integer positionDeleteFileCount;
+ private Long equalityDeleteRecordCount;
+ private Integer equalityDeleteFileCount;
+ private Long totalRecordCount; // Not calculated, as it needs scanning the
data. Remains null
+ private Long lastUpdatedAt;
+ private Long lastUpdatedSnapshotId;
+ private Integer dvCount;
+
+ private static final int STATS_COUNT = 13;
+
+ /** Used by internal readers to instantiate this class with a projection
schema. */
+ public BasePartitionStatistics(Types.StructType projection) {
+ super(STATS_COUNT);
+ }
+
+ @Override
+ public StructLike partition() {
+ return partition;
+ }
+
+ @Override
+ public Integer specId() {
+ return specId;
+ }
+
+ @Override
+ public Long dataRecordCount() {
+ return dataRecordCount;
+ }
+
+ @Override
+ public Integer dataFileCount() {
+ return dataFileCount;
+ }
+
+ @Override
+ public Long totalDataFileSizeInBytes() {
+ return totalDataFileSizeInBytes;
+ }
+
+ @Override
+ public Long positionDeleteRecordCount() {
+ return positionDeleteRecordCount;
+ }
+
+ @Override
+ public Integer positionDeleteFileCount() {
+ return positionDeleteFileCount;
+ }
+
+ @Override
+ public Long equalityDeleteRecordCount() {
+ return equalityDeleteRecordCount;
+ }
+
+ @Override
+ public Integer equalityDeleteFileCount() {
+ return equalityDeleteFileCount;
+ }
+
+ @Override
+ public Long totalRecords() {
+ return totalRecordCount;
+ }
+
+ @Override
+ public Long lastUpdatedAt() {
+ return lastUpdatedAt;
+ }
+
+ @Override
+ public Long lastUpdatedSnapshotId() {
+ return lastUpdatedSnapshotId;
+ }
+
+ @Override
+ public Integer dvCount() {
+ return dvCount;
+ }
+
+ @Override
+ protected <T> T internalGet(int pos, Class<T> javaClass) {
+ return javaClass.cast(getByPos(pos));
+ }
+
+ private Object getByPos(int pos) {
+ switch (pos) {
+ case 0:
+ return partition;
+ case 1:
+ return specId;
+ case 2:
+ return dataRecordCount;
+ case 3:
+ return dataFileCount;
+ case 4:
+ return totalDataFileSizeInBytes;
+ case 5:
+ return positionDeleteRecordCount;
+ case 6:
+ return positionDeleteFileCount;
+ case 7:
+ return equalityDeleteRecordCount;
+ case 8:
+ return equalityDeleteFileCount;
+ case 9:
+ return totalRecordCount;
+ case 10:
+ return lastUpdatedAt;
+ case 11:
+ return lastUpdatedSnapshotId;
+ case 12:
+ return dvCount;
+ default:
+ throw new UnsupportedOperationException("Unknown position: " + pos);
+ }
+ }
+
+ @Override
+ protected <T> void internalSet(int pos, T value) {
+ if (value == null) {
+ return;
+ }
+
+ switch (pos) {
+ case 0:
+ this.partition = (StructLike) value;
+ break;
+ case 1:
+ this.specId = (int) value;
+ break;
+ case 2:
+ this.dataRecordCount = (long) value;
+ break;
+ case 3:
+ this.dataFileCount = (int) value;
+ break;
+ case 4:
+ this.totalDataFileSizeInBytes = (long) value;
+ break;
+ case 5:
+ this.positionDeleteRecordCount = (long) value;
+ break;
+ case 6:
+ this.positionDeleteFileCount = (int) value;
+ break;
+ case 7:
+ this.equalityDeleteRecordCount = (long) value;
+ break;
+ case 8:
+ this.equalityDeleteFileCount = (int) value;
+ break;
+ case 9:
+ this.totalRecordCount = (Long) value;
+ break;
+ case 10:
+ this.lastUpdatedAt = (Long) value;
+ break;
+ case 11:
+ this.lastUpdatedSnapshotId = (Long) value;
+ break;
+ case 12:
+ this.dvCount = (int) value;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown position: " + pos);
+ }
+ }
+}
diff --git
a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java
b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java
new file mode 100644
index 0000000000..075a1a85d3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Optional;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class BasePartitionStatisticsScan implements PartitionStatisticsScan {
+
+ private final Table table;
+ private Long snapshotId;
+
+ public BasePartitionStatisticsScan(Table table) {
+ this.table = table;
+ }
+
+ @Override
+ public PartitionStatisticsScan useSnapshot(long newSnapshotId) {
+ Preconditions.checkArgument(
+ table.snapshot(newSnapshotId) != null, "Cannot find snapshot with ID
%s", newSnapshotId);
+
+ this.snapshotId = newSnapshotId;
+ return this;
+ }
+
+ @Override
+ public PartitionStatisticsScan filter(Expression newFilter) {
+ throw new UnsupportedOperationException("Filtering is not supported");
+ }
+
+ @Override
+ public PartitionStatisticsScan project(Schema newSchema) {
+ throw new UnsupportedOperationException("Projection is not supported");
+ }
+
+ @Override
+ public CloseableIterable<PartitionStatistics> scan() {
+ if (snapshotId == null) {
+ if (table.currentSnapshot() == null) {
+ return CloseableIterable.empty();
+ }
+
+ snapshotId = table.currentSnapshot().snapshotId();
+ }
+
+ Optional<PartitionStatisticsFile> statsFile =
+ table.partitionStatisticsFiles().stream()
+ .filter(f -> f.snapshotId() == snapshotId)
+ .findFirst();
+
+ if (statsFile.isEmpty()) {
+ return CloseableIterable.empty();
+ }
+
+ Types.StructType partitionType = Partitioning.partitionType(table);
+ Schema schema = PartitionStatsHandler.schema(partitionType,
TableUtil.formatVersion(table));
+
+ FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path());
+ Preconditions.checkNotNull(
+ fileFormat != null, "Unable to determine format of file: %s",
statsFile.get().path());
+
+ return InternalData.read(fileFormat,
table.io().newInputFile(statsFile.get().path()))
+ .project(schema)
+ .setRootType(BasePartitionStatistics.class)
+ .build();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 23299a962c..c489c3bfb5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -90,6 +90,11 @@ public class BaseTable implements Table, HasTableOperations,
Serializable {
return new BaseIncrementalChangelogScan(this);
}
+ @Override
+ public PartitionStatisticsScan newPartitionStatisticsScan() {
+ return new BasePartitionStatisticsScan(this);
+ }
+
@Override
public Schema schema() {
return ops.current().schema();
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java
b/core/src/main/java/org/apache/iceberg/PartitionStats.java
index 9051c8535c..e8a4e18916 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStats.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java
@@ -20,6 +20,12 @@ package org.apache.iceberg;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+/**
+ * Class to hold partition statistics values.
+ *
+ * @deprecated will be removed in 1.12.0. Use {@link BasePartitionStatistics
instead}
+ */
+@Deprecated
public class PartitionStats implements StructLike {
private static final int STATS_COUNT = 13;
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 4e7c1b104e..7259a1f068 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -275,7 +275,9 @@ public class PartitionStatsHandler {
*
* @param schema The {@link Schema} of the partition statistics file.
* @param inputFile An {@link InputFile} pointing to the partition stats
file.
+ * @deprecated will be removed in 1.12.0, use {@link
PartitionStatisticsScan} instead
*/
+ @Deprecated
public static CloseableIterable<PartitionStats> readPartitionStatsFile(
Schema schema, InputFile inputFile) {
Preconditions.checkArgument(schema != null, "Invalid schema: null");
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java
new file mode 100644
index 0000000000..89eb70959c
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsScanTestBase.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.groups.Tuple;
+import org.junit.jupiter.api.Test;
+
+public abstract class PartitionStatisticsScanTestBase extends
PartitionStatisticsTestBase {
+
+ public abstract FileFormat format();
+
+ private final Map<String, String> fileFormatProperty =
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name());
+
+ @Test
+ public void testEmptyTable() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_empty_table"), "scan_empty_table", SCHEMA, SPEC, 2,
fileFormatProperty);
+
+
assertThat(Lists.newArrayList(testTable.newPartitionStatisticsScan().scan())).isEmpty();
+ }
+
+ @Test
+ public void testInvalidSnapshotId() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_invalid_snapshot"),
+ "scan_invalid_snapshot",
+ SCHEMA,
+ SPEC,
+ 2,
+ fileFormatProperty);
+
+ assertThatThrownBy(() ->
testTable.newPartitionStatisticsScan().useSnapshot(1234L).scan())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot find snapshot with ID 1234");
+ }
+
+ @Test
+ public void testNoStatsForSnapshot() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_no_stats"), "scan_no_stats", SCHEMA, SPEC, 2,
fileFormatProperty);
+
+ DataFile dataFile =
+ DataFiles.builder(SPEC)
+ .withPath("some_path")
+ .withFileSizeInBytes(15)
+ .withFormat(format())
+ .withRecordCount(1)
+ .build();
+ testTable.newAppend().appendFile(dataFile).commit();
+ long snapshotId = testTable.currentSnapshot().snapshotId();
+
+
assertThat(testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()).isEmpty();
+ }
+
+ @Test
+ public void testReadingStatsWithInvalidSchema() throws Exception {
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_with_old_schema"),
+ "scan_with_old_schema",
+ SCHEMA,
+ spec,
+ 2,
+ fileFormatProperty);
+ Types.StructType partitionType = Partitioning.partitionType(testTable);
+ Schema oldSchema = invalidOldSchema(partitionType);
+
+ // Add a dummy file to the table to have a snapshot
+ DataFile dataFile =
+ DataFiles.builder(spec)
+ .withPath("some_path")
+ .withFileSizeInBytes(15)
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(1)
+ .build();
+ testTable.newAppend().appendFile(dataFile).commit();
+ long snapshotId = testTable.currentSnapshot().snapshotId();
+
+ testTable
+ .updatePartitionStatistics()
+ .setPartitionStatistics(
+ PartitionStatsHandler.writePartitionStatsFile(
+ testTable,
+ snapshotId,
+ oldSchema,
+ Collections.singletonList(randomStats(partitionType))))
+ .commit();
+
+ try (CloseableIterable<PartitionStatistics> recordIterator =
+ testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan())
{
+
+ if (format() == FileFormat.PARQUET) {
+ assertThatThrownBy(() -> Lists.newArrayList(recordIterator))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Not a primitive type: struct");
+ } else if (format() == FileFormat.AVRO) {
+ assertThatThrownBy(() -> Lists.newArrayList(recordIterator))
+ .isInstanceOf(ClassCastException.class)
+ .hasMessageContaining("Integer cannot be cast to class
org.apache.iceberg.StructLike");
+ }
+ }
+ }
+
+ @Test
+ public void testV2toV3SchemaEvolution() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_with_schema_evolution"),
+ "scan_with_schema_evolution",
+ SCHEMA,
+ SPEC,
+ 2,
+ fileFormatProperty);
+
+ // write stats file using v2 schema
+ DataFile dataFile =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "A"));
+ testTable.newAppend().appendFile(dataFile).commit();
+
+ testTable
+ .updatePartitionStatistics()
+ .setPartitionStatistics(
+ PartitionStatsHandler.computeAndWriteStatsFile(
+ testTable, testTable.currentSnapshot().snapshotId()))
+ .commit();
+
+ Types.StructType partitionSchema = Partitioning.partitionType(testTable);
+
+ // read with v2 schema
+ List<PartitionStatistics> partitionStatsV2;
+ try (CloseableIterable<PartitionStatistics> recordIterator =
+ testTable.newPartitionStatisticsScan().scan()) {
+ partitionStatsV2 = Lists.newArrayList(recordIterator);
+ }
+
+ testTable.updateProperties().set(TableProperties.FORMAT_VERSION,
"3").commit();
+
+ // read with v3 schema
+ List<PartitionStatistics> partitionStatsV3;
+ try (CloseableIterable<PartitionStatistics> recordIterator =
+ testTable.newPartitionStatisticsScan().scan()) {
+ partitionStatsV3 = Lists.newArrayList(recordIterator);
+ }
+
+ assertThat(partitionStatsV2).hasSameSizeAs(partitionStatsV3);
+ Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
+ for (int i = 0; i < partitionStatsV2.size(); i++) {
+ assertThat(isEqual(comparator, partitionStatsV2.get(i),
partitionStatsV3.get(i))).isTrue();
+ }
+ }
+
+ @SuppressWarnings("checkstyle:MethodLength")
+ @Test
+ public void testScanPartitionStatsForCurrentSnapshot() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_partition_stats"),
+ "scan_partition_stats",
+ SCHEMA,
+ SPEC,
+ 2,
+ fileFormatProperty);
+
+ DataFile dataFile1 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "A"));
+ DataFile dataFile2 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "B"));
+ DataFile dataFile3 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("bar", "A"));
+ DataFile dataFile4 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("bar", "B"));
+
+ for (int i = 0; i < 3; i++) {
+ // insert same set of seven records thrice to have a new manifest files
+ testTable
+ .newAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .appendFile(dataFile3)
+ .appendFile(dataFile4)
+ .commit();
+ }
+
+ Snapshot snapshot1 = testTable.currentSnapshot();
+ Schema recordSchema =
PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
+
+ Types.StructType partitionType =
+ recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
+ computeAndValidatePartitionStats(
+ testTable,
+ testTable.currentSnapshot().snapshotId(),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "A"),
+ 0,
+ 3 * dataFile1.recordCount(),
+ 3,
+ 3 * dataFile1.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ null),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "B"),
+ 0,
+ 3 * dataFile2.recordCount(),
+ 3,
+ 3 * dataFile2.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ null),
+ Tuple.tuple(
+ partitionRecord(partitionType, "bar", "A"),
+ 0,
+ 3 * dataFile3.recordCount(),
+ 3,
+ 3 * dataFile3.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ null),
+ Tuple.tuple(
+ partitionRecord(partitionType, "bar", "B"),
+ 0,
+ 3 * dataFile4.recordCount(),
+ 3,
+ 3 * dataFile4.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ null));
+
+ DeleteFile posDelete =
+ FileGenerationUtil.generatePositionDeleteFile(testTable,
TestHelpers.Row.of("bar", "A"));
+ testTable.newRowDelta().addDeletes(posDelete).commit();
+ // snapshot2 is unused in the result as same partition was updated by
snapshot4
+
+ DeleteFile eqDelete =
+ FileGenerationUtil.generateEqualityDeleteFile(testTable,
TestHelpers.Row.of("foo", "A"));
+ testTable.newRowDelta().addDeletes(eqDelete).commit();
+ Snapshot snapshot3 = testTable.currentSnapshot();
+
+ testTable.updateProperties().set(TableProperties.FORMAT_VERSION,
"3").commit();
+ DeleteFile dv = FileGenerationUtil.generateDV(testTable, dataFile3);
+ testTable.newRowDelta().addDeletes(dv).commit();
+ Snapshot snapshot4 = testTable.currentSnapshot();
+
+ computeAndValidatePartitionStats(
+ testTable,
+ testTable.currentSnapshot().snapshotId(),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "A"),
+ 0,
+ 3 * dataFile1.recordCount(),
+ 3,
+ 3 * dataFile1.fileSizeInBytes(),
+ 0L,
+ 0,
+ eqDelete.recordCount(),
+ 1,
+ null,
+ snapshot3.timestampMillis(),
+ snapshot3.snapshotId(),
+ 0),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "B"),
+ 0,
+ 3 * dataFile2.recordCount(),
+ 3,
+ 3 * dataFile2.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ 0),
+ Tuple.tuple(
+ partitionRecord(partitionType, "bar", "A"),
+ 0,
+ 3 * dataFile3.recordCount(),
+ 3,
+ 3 * dataFile3.fileSizeInBytes(),
+ posDelete.recordCount() + dv.recordCount(),
+ 1,
+ 0L,
+ 0,
+ null,
+ snapshot4.timestampMillis(),
+ snapshot4.snapshotId(),
+ 1), // dv count
+ Tuple.tuple(
+ partitionRecord(partitionType, "bar", "B"),
+ 0,
+ 3 * dataFile4.recordCount(),
+ 3,
+ 3 * dataFile4.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ snapshot1.timestampMillis(),
+ snapshot1.snapshotId(),
+ 0));
+ }
+
+ @Test
+ public void testScanPartitionStatsForOlderSnapshot() throws Exception {
+ Table testTable =
+ TestTables.create(
+ tempDir("scan_older_snapshot"),
+ "scan_older_snapshot",
+ SCHEMA,
+ SPEC,
+ 2,
+ fileFormatProperty);
+
+ DataFile dataFile1 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "A"));
+ DataFile dataFile2 =
+ FileGenerationUtil.generateDataFile(testTable,
TestHelpers.Row.of("foo", "B"));
+
+ testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Snapshot firstSnapshot = testTable.currentSnapshot();
+
+ testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Schema recordSchema =
PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2);
+
+ Types.StructType partitionType =
+ recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
+
+ computeAndValidatePartitionStats(
+ testTable,
+ firstSnapshot.snapshotId(),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "A"),
+ 0,
+ dataFile1.recordCount(),
+ 1,
+ dataFile1.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ firstSnapshot.timestampMillis(),
+ firstSnapshot.snapshotId(),
+ null),
+ Tuple.tuple(
+ partitionRecord(partitionType, "foo", "B"),
+ 0,
+ dataFile2.recordCount(),
+ 1,
+ dataFile2.fileSizeInBytes(),
+ 0L,
+ 0,
+ 0L,
+ 0,
+ null,
+ firstSnapshot.timestampMillis(),
+ firstSnapshot.snapshotId(),
+ null));
+ }
+
+ private static void computeAndValidatePartitionStats(
+ Table testTable, long snapshotId, Tuple... expectedValues) throws
IOException {
+ PartitionStatisticsFile result =
+ PartitionStatsHandler.computeAndWriteStatsFile(testTable, snapshotId);
+
testTable.updatePartitionStatistics().setPartitionStatistics(result).commit();
+ assertThat(result.snapshotId()).isEqualTo(snapshotId);
+
+ PartitionStatisticsScan statScan = testTable.newPartitionStatisticsScan();
+ if (testTable.currentSnapshot().snapshotId() != snapshotId) {
+ statScan.useSnapshot(snapshotId);
+ }
+
+ List<PartitionStatistics> partitionStats;
+ try (CloseableIterable<PartitionStatistics> recordIterator =
statScan.scan()) {
+ partitionStats = Lists.newArrayList(recordIterator);
+ }
+
+ assertThat(partitionStats)
+ .extracting(
+ PartitionStatistics::partition,
+ PartitionStatistics::specId,
+ PartitionStatistics::dataRecordCount,
+ PartitionStatistics::dataFileCount,
+ PartitionStatistics::totalDataFileSizeInBytes,
+ PartitionStatistics::positionDeleteRecordCount,
+ PartitionStatistics::positionDeleteFileCount,
+ PartitionStatistics::equalityDeleteRecordCount,
+ PartitionStatistics::equalityDeleteFileCount,
+ PartitionStatistics::totalRecords,
+ PartitionStatistics::lastUpdatedAt,
+ PartitionStatistics::lastUpdatedSnapshotId,
+ PartitionStatistics::dvCount)
+ .containsExactlyInAnyOrder(expectedValues);
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ protected static boolean isEqual(
+ Comparator<StructLike> partitionComparator,
+ PartitionStatistics stats1,
+ PartitionStatistics stats2) {
+ if (stats1 == stats2) {
+ return true;
+ } else if (stats1 == null || stats2 == null) {
+ return false;
+ }
+
+ return partitionComparator.compare(stats1.partition(), stats2.partition())
== 0
+ && Objects.equals(stats1.specId(), stats2.specId())
+ && Objects.equals(stats1.dataRecordCount(), stats2.dataRecordCount())
+ && Objects.equals(stats1.dataFileCount(), stats2.dataFileCount())
+ && Objects.equals(stats1.totalDataFileSizeInBytes(),
stats2.totalDataFileSizeInBytes())
+ && Objects.equals(stats1.positionDeleteRecordCount(),
stats2.positionDeleteRecordCount())
+ && Objects.equals(stats1.positionDeleteFileCount(),
stats2.positionDeleteFileCount())
+ && Objects.equals(stats1.equalityDeleteRecordCount(),
stats2.equalityDeleteRecordCount())
+ && Objects.equals(stats1.equalityDeleteFileCount(),
stats2.equalityDeleteFileCount())
+ && Objects.equals(stats1.totalRecords(), stats2.totalRecords())
+ && Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt())
+ && Objects.equals(stats1.lastUpdatedSnapshotId(),
stats2.lastUpdatedSnapshotId());
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
new file mode 100644
index 0000000000..72a5405d7f
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatisticsTestBase.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT;
+import static
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
+import static
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT;
+import static
org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
+import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME;
+import static
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
+import static
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
+import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID;
+import static
org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
+import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class PartitionStatisticsTestBase {
+
+ @TempDir private File temp;
+
+ // positions in StructLike
+ protected static final int DATA_RECORD_COUNT_POSITION = 2;
+ protected static final int DATA_FILE_COUNT_POSITION = 3;
+ protected static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
+ protected static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
+ protected static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
+ protected static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
+ protected static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
+ protected static final int TOTAL_RECORD_COUNT_POSITION = 9;
+ protected static final int LAST_UPDATED_AT_POSITION = 10;
+ protected static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
+ protected static final int DV_COUNT_POSITION = 12;
+
+ protected static final Schema SCHEMA =
+ new Schema(
+ optional(1, "c1", Types.IntegerType.get()),
+ optional(2, "c2", Types.StringType.get()),
+ optional(3, "c3", Types.StringType.get()));
+
+ protected static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
+
+ private static final Random RANDOM = ThreadLocalRandom.current();
+
+ protected Schema invalidOldSchema(Types.StructType unifiedPartitionType) {
+ // field ids starts from 0 instead of 1
+ return new Schema(
+ Types.NestedField.required(0, PARTITION_FIELD_NAME,
unifiedPartitionType),
+ Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()),
+ Types.NestedField.required(2, DATA_RECORD_COUNT.name(),
Types.LongType.get()),
+ Types.NestedField.required(3, DATA_FILE_COUNT.name(),
Types.IntegerType.get()),
+ Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(),
Types.LongType.get()),
+ Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
+ Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
+ Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
+ Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
+ Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(),
Types.LongType.get()),
+ Types.NestedField.optional(10, LAST_UPDATED_AT.name(),
Types.LongType.get()),
+ Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(),
Types.LongType.get()));
+ }
+
+ protected PartitionStats randomStats(Types.StructType partitionType) {
+ PartitionData partitionData = new PartitionData(partitionType);
+ partitionData.set(0, RANDOM.nextInt());
+
+ return randomStats(partitionData);
+ }
+
+ protected PartitionStats randomStats(PartitionData partitionData) {
+ PartitionStats stats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
+ stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
+ stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
+ stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
+ return stats;
+ }
+
+ protected File tempDir(String folderName) throws IOException {
+ return java.nio.file.Files.createTempDirectory(temp.toPath(),
folderName).toFile();
+ }
+
+ protected static StructLike partitionRecord(
+ Types.StructType partitionType, String val1, String val2) {
+ GenericRecord record = GenericRecord.create(partitionType);
+ record.set(0, val1);
+ record.set(1, val2);
+ return record;
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index 71fdc9507d..9b93013a9b 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -18,25 +18,12 @@
*/
package org.apache.iceberg;
-import static org.apache.iceberg.PartitionStatsHandler.DATA_FILE_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.DATA_RECORD_COUNT;
-import static
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
-import static
org.apache.iceberg.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_AT;
-import static
org.apache.iceberg.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_ID;
-import static org.apache.iceberg.PartitionStatsHandler.PARTITION_FIELD_NAME;
-import static
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
-import static
org.apache.iceberg.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
-import static org.apache.iceberg.PartitionStatsHandler.SPEC_ID;
-import static
org.apache.iceberg.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
-import static org.apache.iceberg.PartitionStatsHandler.TOTAL_RECORD_COUNT;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -46,10 +33,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -61,10 +45,9 @@ import org.assertj.core.groups.Tuple;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
-public abstract class PartitionStatsHandlerTestBase {
+public abstract class PartitionStatsHandlerTestBase extends
PartitionStatisticsTestBase {
public abstract FileFormat format();
@@ -75,35 +58,9 @@ public abstract class PartitionStatsHandlerTestBase {
@Parameter protected int formatVersion;
- private static final Schema SCHEMA =
- new Schema(
- optional(1, "c1", Types.IntegerType.get()),
- optional(2, "c2", Types.StringType.get()),
- optional(3, "c3", Types.StringType.get()));
-
- protected static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
-
- @TempDir public File temp;
-
- private static final Random RANDOM = ThreadLocalRandom.current();
-
private final Map<String, String> fileFormatProperty =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format().name());
- // position in StructLike
- private static final int DATA_RECORD_COUNT_POSITION = 2;
- private static final int DATA_FILE_COUNT_POSITION = 3;
- private static final int TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION = 4;
- private static final int POSITION_DELETE_RECORD_COUNT_POSITION = 5;
- private static final int POSITION_DELETE_FILE_COUNT_POSITION = 6;
- private static final int EQUALITY_DELETE_RECORD_COUNT_POSITION = 7;
- private static final int EQUALITY_DELETE_FILE_COUNT_POSITION = 8;
- private static final int TOTAL_RECORD_COUNT_POSITION = 9;
- private static final int LAST_UPDATED_AT_POSITION = 10;
- private static final int LAST_UPDATED_SNAPSHOT_ID_POSITION = 11;
- private static final int DV_COUNT_POSITION = 12;
-
@Test
public void testPartitionStatsOnEmptyTable() throws Exception {
Table testTable =
@@ -223,10 +180,7 @@ public abstract class PartitionStatsHandlerTestBase {
partitionData.set(13, new BigDecimal("12345678901234567890.1234567890"));
partitionData.set(14,
Literal.of("10:10:10").to(Types.TimeType.get()).value());
- PartitionStats partitionStats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
- partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
- partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
+ PartitionStats partitionStats = randomStats(partitionData);
List<PartitionStats> expected = Collections.singletonList(partitionStats);
PartitionStatisticsFile statisticsFile =
PartitionStatsHandler.writePartitionStatsFile(testTable, 42L,
dataSchema, expected);
@@ -262,14 +216,8 @@ public abstract class PartitionStatsHandlerTestBase {
ImmutableList.Builder<PartitionStats> partitionListBuilder =
ImmutableList.builder();
for (int i = 0; i < 5; i++) {
- PartitionData partitionData =
- new
PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
- partitionData.set(0, RANDOM.nextInt());
-
- PartitionStats stats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
- stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
- stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
+ PartitionStats stats =
+
randomStats(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
stats.set(POSITION_DELETE_RECORD_COUNT_POSITION, null);
stats.set(POSITION_DELETE_FILE_COUNT_POSITION, null);
stats.set(EQUALITY_DELETE_RECORD_COUNT_POSITION, null);
@@ -315,8 +263,12 @@ public abstract class PartitionStatsHandlerTestBase {
}
}
+ /**
+ * @deprecated will be removed in 1.12.0
+ */
@SuppressWarnings("checkstyle:MethodLength")
@Test
+ @Deprecated
public void testPartitionStats() throws Exception {
Table testTable =
TestTables.create(
@@ -611,7 +563,11 @@ public abstract class PartitionStatsHandlerTestBase {
assertThat(PartitionStatsHandler.latestStatsFile(testTable,
snapshotBranchBId)).isNull();
}
+ /**
+ * @deprecated will be removed in 1.12.0
+ */
@Test
+ @Deprecated
public void testReadingStatsWithInvalidSchema() throws Exception {
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table testTable =
@@ -677,7 +633,11 @@ public abstract class PartitionStatsHandlerTestBase {
assertThat(partitionStats.get(0).dataFileCount()).isEqualTo(2);
}
+ /**
+ * @deprecated will be removed in 1.12.0
+ */
@Test
+ @Deprecated
public void testV2toV3SchemaEvolution() throws Exception {
Table testTable =
TestTables.create(
@@ -718,14 +678,6 @@ public abstract class PartitionStatsHandlerTestBase {
}
}
- private static StructLike partitionRecord(
- Types.StructType partitionType, String val1, String val2) {
- GenericRecord record = GenericRecord.create(partitionType);
- record.set(0, val1);
- record.set(1, val2);
- return record;
- }
-
private static void computeAndValidatePartitionStats(
Table testTable, Schema recordSchema, Tuple... expectedValues) throws
IOException {
// compute and commit partition stats file
@@ -760,38 +712,6 @@ public abstract class PartitionStatsHandlerTestBase {
.containsExactlyInAnyOrder(expectedValues);
}
- private File tempDir(String folderName) throws IOException {
- return java.nio.file.Files.createTempDirectory(temp.toPath(),
folderName).toFile();
- }
-
- private Schema invalidOldSchema(Types.StructType unifiedPartitionType) {
- // field ids starts from 0 instead of 1
- return new Schema(
- Types.NestedField.required(0, PARTITION_FIELD_NAME,
unifiedPartitionType),
- Types.NestedField.required(1, SPEC_ID.name(), Types.IntegerType.get()),
- Types.NestedField.required(2, DATA_RECORD_COUNT.name(),
Types.LongType.get()),
- Types.NestedField.required(3, DATA_FILE_COUNT.name(),
Types.IntegerType.get()),
- Types.NestedField.required(4, TOTAL_DATA_FILE_SIZE_IN_BYTES.name(),
Types.LongType.get()),
- Types.NestedField.optional(5, POSITION_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
- Types.NestedField.optional(6, POSITION_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
- Types.NestedField.optional(7, EQUALITY_DELETE_RECORD_COUNT.name(),
Types.LongType.get()),
- Types.NestedField.optional(8, EQUALITY_DELETE_FILE_COUNT.name(),
Types.IntegerType.get()),
- Types.NestedField.optional(9, TOTAL_RECORD_COUNT.name(),
Types.LongType.get()),
- Types.NestedField.optional(10, LAST_UPDATED_AT.name(),
Types.LongType.get()),
- Types.NestedField.optional(11, LAST_UPDATED_SNAPSHOT_ID.name(),
Types.LongType.get()));
- }
-
- private PartitionStats randomStats(Types.StructType partitionType) {
- PartitionData partitionData = new PartitionData(partitionType);
- partitionData.set(0, RANDOM.nextInt());
-
- PartitionStats stats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- stats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong());
- stats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt());
- stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L *
RANDOM.nextInt(20));
- return stats;
- }
-
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean isEqual(
Comparator<StructLike> partitionComparator, PartitionStats stats1,
PartitionStats stats2) {
diff --git
a/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
new file mode 100644
index 0000000000..54e03180ce
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/avro/TestAvroPartitionStatisticsScan.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestAvroPartitionStatisticsScan extends
PartitionStatisticsScanTestBase {
+
+ public FileFormat format() {
+ return FileFormat.AVRO;
+ }
+}
diff --git
a/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java
new file mode 100644
index 0000000000..2040f046ee
--- /dev/null
+++
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcPartitionStatisticsScan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestOrcPartitionStatisticsScan extends
PartitionStatisticsScanTestBase {
+ @Override
+ public FileFormat format() {
+ return FileFormat.ORC;
+ }
+
+ @Override
+ public void testScanPartitionStatsForCurrentSnapshot() throws Exception {
+ assertThatThrownBy(super::testScanPartitionStatsForCurrentSnapshot)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
+
+ @Override
+ public void testScanPartitionStatsForOlderSnapshot() throws Exception {
+ assertThatThrownBy(super::testScanPartitionStatsForOlderSnapshot)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
+
+ @Override
+ public void testReadingStatsWithInvalidSchema() throws Exception {
+ assertThatThrownBy(super::testReadingStatsWithInvalidSchema)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
+
+ @Override
+ public void testV2toV3SchemaEvolution() throws Exception {
+ assertThatThrownBy(super::testV2toV3SchemaEvolution)
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Cannot write using unregistered internal data format:
ORC");
+ }
+}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
new file mode 100644
index 0000000000..5152e31b28
--- /dev/null
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetPartitionStatisticsScan.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionStatisticsScanTestBase;
+
+public class TestParquetPartitionStatisticsScan extends
PartitionStatisticsScanTestBase {
+
+ @Override
+ public FileFormat format() {
+ return FileFormat.PARQUET;
+ }
+}