kasakrisz commented on code in PR #5846: URL: https://github.com/apache/hive/pull/5846#discussion_r2141717969
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java: ########## @@ -39,121 +38,131 @@ import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.InternalReader; -import org.apache.iceberg.avro.InternalWriter; +import org.apache.iceberg.data.parquet.InternalWriter; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; -import org.apache.iceberg.util.SnapshotUtil; -// TODO: remove class once Iceberg PR #11216 is merged and released +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; /** * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses generic readers and writers * to support writing and reading of the stats in table default format. */ -public final class PartitionStatsHandler { +public class PartitionStatsHandler { private PartitionStatsHandler() { } - public enum Column { - PARTITION(0), - SPEC_ID(1), - DATA_RECORD_COUNT(2), - DATA_FILE_COUNT(3), - TOTAL_DATA_FILE_SIZE_IN_BYTES(4), - POSITION_DELETE_RECORD_COUNT(5), - POSITION_DELETE_FILE_COUNT(6), - EQUALITY_DELETE_RECORD_COUNT(7), - EQUALITY_DELETE_FILE_COUNT(8), - TOTAL_RECORD_COUNT(9), - LAST_UPDATED_AT(10), - LAST_UPDATED_SNAPSHOT_ID(11); - - private final int id; - - Column(int id) { - this.id = id; - } - - public int id() { - return id; - } - } + public static final int PARTITION_FIELD_ID = 0; + public static final String PARTITION_FIELD_NAME = "partition"; + public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", IntegerType.get()); + public static final NestedField DATA_RECORD_COUNT = + NestedField.required(2, "data_record_count", LongType.get()); + public static final NestedField DATA_FILE_COUNT = + NestedField.required(3, "data_file_count", IntegerType.get()); + public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES = + NestedField.required(4, "total_data_file_size_in_bytes", LongType.get()); + public static final NestedField POSITION_DELETE_RECORD_COUNT = + NestedField.optional(5, "position_delete_record_count", LongType.get()); + public static final NestedField POSITION_DELETE_FILE_COUNT = + NestedField.optional(6, "position_delete_file_count", IntegerType.get()); + public static final NestedField EQUALITY_DELETE_RECORD_COUNT = + NestedField.optional(7, "equality_delete_record_count", LongType.get()); + public static final NestedField EQUALITY_DELETE_FILE_COUNT = + NestedField.optional(8, "equality_delete_file_count", IntegerType.get()); + public static final NestedField TOTAL_RECORD_COUNT = + NestedField.optional(9, "total_record_count", LongType.get()); + public static final NestedField LAST_UPDATED_AT = + NestedField.optional(10, "last_updated_at", LongType.get()); + public static final NestedField LAST_UPDATED_SNAPSHOT_ID = + NestedField.optional(11, "last_updated_snapshot_id", LongType.get()); /** - * Generates the partition stats file schema based on a given partition type. + * Generates the partition stats file schema based on a combined partition type which considers + * all specs in a table. * - * <p>Note: Provide the unified partition schema type as mentioned in the spec. - * - * @param partitionType unified partition schema type. + * @param unifiedPartitionType unified partition schema type. Could be calculated by {@link + * Partitioning#partitionType(Table)}. * @return a schema that corresponds to the provided unified partition type. */ - public static Schema schema(StructType partitionType) { - Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be partitioned"); + public static Schema schema(StructType unifiedPartitionType) { + Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table must be partitioned"); return new Schema( - NestedField.required(1, Column.PARTITION.name(), partitionType), - NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()), - NestedField.required(3, Column.DATA_RECORD_COUNT.name(), LongType.get()), - NestedField.required(4, Column.DATA_FILE_COUNT.name(), IntegerType.get()), - NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), LongType.get()), - NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), IntegerType.get()), - NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), IntegerType.get()), - NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), LongType.get()), - NestedField.optional(11, Column.LAST_UPDATED_AT.name(), LongType.get()), - NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), LongType.get())); + NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, unifiedPartitionType), + SPEC_ID, + DATA_RECORD_COUNT, + DATA_FILE_COUNT, + TOTAL_DATA_FILE_SIZE_IN_BYTES, + POSITION_DELETE_RECORD_COUNT, + POSITION_DELETE_FILE_COUNT, + EQUALITY_DELETE_RECORD_COUNT, + EQUALITY_DELETE_FILE_COUNT, + TOTAL_RECORD_COUNT, + LAST_UPDATED_AT, + LAST_UPDATED_SNAPSHOT_ID); } /** * Computes and writes the {@link PartitionStatisticsFile} for a given table's current snapshot. * * @param table The {@link Table} for which the partition statistics is computed. - * @return {@link PartitionStatisticsFile} for the current snapshot. + * @return {@link PartitionStatisticsFile} for the current snapshot, or null if no statistics are + * present. */ - public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) { - return computeAndWriteStatsFile(table, null); + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) throws IOException { + if (table.currentSnapshot() == null) { + return null; + } + + return computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId()); } /** - * Computes and writes the {@link PartitionStatisticsFile} for a given table and branch. + * Computes and writes the {@link PartitionStatisticsFile} for a given table and snapshot. * * @param table The {@link Table} for which the partition statistics is computed. - * @param branch A branch information to select the required snapshot. - * @return {@link PartitionStatisticsFile} for the given branch. + * @param snapshotId snapshot for which partition statistics are computed. + * @return {@link PartitionStatisticsFile} for the given snapshot, or null if no statistics are + * present. */ - public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, String branch) { - Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); - if (currentSnapshot == null) { - Preconditions.checkArgument( - branch == null, "Couldn't find the snapshot for the branch %s", branch); + public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long snapshotId) + throws IOException { + Snapshot snapshot = table.snapshot(snapshotId); + Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); + + Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, snapshot); + if (stats.isEmpty()) { return null; } StructType partitionType = Partitioning.partitionType(table); - Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, currentSnapshot); List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, partitionType); return writePartitionStatsFile( - table, currentSnapshot.snapshotId(), schema(partitionType), sortedStats.iterator()); + table, snapshot.snapshotId(), schema(partitionType), sortedStats); } @VisibleForTesting static PartitionStatisticsFile writePartitionStatsFile( - Table table, long snapshotId, Schema dataSchema, Iterator<PartitionStats> records) { - OutputFile outputFile = newPartitionStatsFile(table, snapshotId); + Table table, long snapshotId, Schema dataSchema, Iterable<PartitionStats> records) + throws IOException { + FileFormat fileFormat = + FileFormat.fromString( + table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)); + + OutputFile outputFile = newPartitionStatsFile(table, fileFormat, snapshotId); Review Comment: I added fallback to Avro fileformat when `fileFormat` is Orc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org