kasakrisz commented on code in PR #5846:
URL: https://github.com/apache/hive/pull/5846#discussion_r2141717969


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java:
##########
@@ -39,121 +38,131 @@
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.InternalReader;
-import org.apache.iceberg.avro.InternalWriter;
+import org.apache.iceberg.data.parquet.InternalWriter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
 import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.types.Types.IntegerType;
 import org.apache.iceberg.types.Types.LongType;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.SnapshotUtil;
 
-// TODO: remove class once Iceberg PR #11216 is merged and released
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 
 /**
  * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses 
generic readers and writers
  * to support writing and reading of the stats in table default format.
  */
-public final class PartitionStatsHandler {
+public class PartitionStatsHandler {
 
   private PartitionStatsHandler() {
   }
 
-  public enum Column {
-    PARTITION(0),
-    SPEC_ID(1),
-    DATA_RECORD_COUNT(2),
-    DATA_FILE_COUNT(3),
-    TOTAL_DATA_FILE_SIZE_IN_BYTES(4),
-    POSITION_DELETE_RECORD_COUNT(5),
-    POSITION_DELETE_FILE_COUNT(6),
-    EQUALITY_DELETE_RECORD_COUNT(7),
-    EQUALITY_DELETE_FILE_COUNT(8),
-    TOTAL_RECORD_COUNT(9),
-    LAST_UPDATED_AT(10),
-    LAST_UPDATED_SNAPSHOT_ID(11);
-
-    private final int id;
-
-    Column(int id) {
-      this.id = id;
-    }
-
-    public int id() {
-      return id;
-    }
-  }
+  public static final int PARTITION_FIELD_ID = 0;
+  public static final String PARTITION_FIELD_NAME = "partition";
+  public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", 
IntegerType.get());
+  public static final NestedField DATA_RECORD_COUNT =
+      NestedField.required(2, "data_record_count", LongType.get());
+  public static final NestedField DATA_FILE_COUNT =
+      NestedField.required(3, "data_file_count", IntegerType.get());
+  public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
+      NestedField.required(4, "total_data_file_size_in_bytes", LongType.get());
+  public static final NestedField POSITION_DELETE_RECORD_COUNT =
+      NestedField.optional(5, "position_delete_record_count", LongType.get());
+  public static final NestedField POSITION_DELETE_FILE_COUNT =
+      NestedField.optional(6, "position_delete_file_count", IntegerType.get());
+  public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
+      NestedField.optional(7, "equality_delete_record_count", LongType.get());
+  public static final NestedField EQUALITY_DELETE_FILE_COUNT =
+      NestedField.optional(8, "equality_delete_file_count", IntegerType.get());
+  public static final NestedField TOTAL_RECORD_COUNT =
+      NestedField.optional(9, "total_record_count", LongType.get());
+  public static final NestedField LAST_UPDATED_AT =
+      NestedField.optional(10, "last_updated_at", LongType.get());
+  public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
+      NestedField.optional(11, "last_updated_snapshot_id", LongType.get());
 
   /**
-   * Generates the partition stats file schema based on a given partition type.
+   * Generates the partition stats file schema based on a combined partition 
type which considers
+   * all specs in a table.
    *
-   * <p>Note: Provide the unified partition schema type as mentioned in the 
spec.
-   *
-   * @param partitionType unified partition schema type.
+   * @param unifiedPartitionType unified partition schema type. Could be 
calculated by {@link
+   *     Partitioning#partitionType(Table)}.
    * @return a schema that corresponds to the provided unified partition type.
    */
-  public static Schema schema(StructType partitionType) {
-    Preconditions.checkState(!partitionType.fields().isEmpty(), "table must be 
partitioned");
+  public static Schema schema(StructType unifiedPartitionType) {
+    Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table 
must be partitioned");
     return new Schema(
-        NestedField.required(1, Column.PARTITION.name(), partitionType),
-        NestedField.required(2, Column.SPEC_ID.name(), IntegerType.get()),
-        NestedField.required(3, Column.DATA_RECORD_COUNT.name(), 
LongType.get()),
-        NestedField.required(4, Column.DATA_FILE_COUNT.name(), 
IntegerType.get()),
-        NestedField.required(5, Column.TOTAL_DATA_FILE_SIZE_IN_BYTES.name(), 
LongType.get()),
-        NestedField.optional(6, Column.POSITION_DELETE_RECORD_COUNT.name(), 
LongType.get()),
-        NestedField.optional(7, Column.POSITION_DELETE_FILE_COUNT.name(), 
IntegerType.get()),
-        NestedField.optional(8, Column.EQUALITY_DELETE_RECORD_COUNT.name(), 
LongType.get()),
-        NestedField.optional(9, Column.EQUALITY_DELETE_FILE_COUNT.name(), 
IntegerType.get()),
-        NestedField.optional(10, Column.TOTAL_RECORD_COUNT.name(), 
LongType.get()),
-        NestedField.optional(11, Column.LAST_UPDATED_AT.name(), 
LongType.get()),
-        NestedField.optional(12, Column.LAST_UPDATED_SNAPSHOT_ID.name(), 
LongType.get()));
+        NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, 
unifiedPartitionType),
+        SPEC_ID,
+        DATA_RECORD_COUNT,
+        DATA_FILE_COUNT,
+        TOTAL_DATA_FILE_SIZE_IN_BYTES,
+        POSITION_DELETE_RECORD_COUNT,
+        POSITION_DELETE_FILE_COUNT,
+        EQUALITY_DELETE_RECORD_COUNT,
+        EQUALITY_DELETE_FILE_COUNT,
+        TOTAL_RECORD_COUNT,
+        LAST_UPDATED_AT,
+        LAST_UPDATED_SNAPSHOT_ID);
   }
 
   /**
    * Computes and writes the {@link PartitionStatisticsFile} for a given 
table's current snapshot.
    *
    * @param table The {@link Table} for which the partition statistics is 
computed.
-   * @return {@link PartitionStatisticsFile} for the current snapshot.
+   * @return {@link PartitionStatisticsFile} for the current snapshot, or null 
if no statistics are
+   *     present.
    */
-  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) {
-    return computeAndWriteStatsFile(table, null);
+  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) 
throws IOException {
+    if (table.currentSnapshot() == null) {
+      return null;
+    }
+
+    return computeAndWriteStatsFile(table, 
table.currentSnapshot().snapshotId());
   }
 
   /**
-   * Computes and writes the {@link PartitionStatisticsFile} for a given table 
and branch.
+   * Computes and writes the {@link PartitionStatisticsFile} for a given table 
and snapshot.
    *
    * @param table The {@link Table} for which the partition statistics is 
computed.
-   * @param branch A branch information to select the required snapshot.
-   * @return {@link PartitionStatisticsFile} for the given branch.
+   * @param snapshotId snapshot for which partition statistics are computed.
+   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
+   *     present.
    */
-  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
String branch) {
-    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
-    if (currentSnapshot == null) {
-      Preconditions.checkArgument(
-          branch == null, "Couldn't find the snapshot for the branch %s", 
branch);
+  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
+      throws IOException {
+    Snapshot snapshot = table.snapshot(snapshotId);
+    Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
+
+    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
+    if (stats.isEmpty()) {
       return null;
     }
 
     StructType partitionType = Partitioning.partitionType(table);
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
currentSnapshot);
     List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
     return writePartitionStatsFile(
-        table, currentSnapshot.snapshotId(), schema(partitionType), 
sortedStats.iterator());
+        table, snapshot.snapshotId(), schema(partitionType), sortedStats);
   }
 
   @VisibleForTesting
   static PartitionStatisticsFile writePartitionStatsFile(
-      Table table, long snapshotId, Schema dataSchema, 
Iterator<PartitionStats> records) {
-    OutputFile outputFile = newPartitionStatsFile(table, snapshotId);
+      Table table, long snapshotId, Schema dataSchema, 
Iterable<PartitionStats> records)
+      throws IOException {
+    FileFormat fileFormat =
+        FileFormat.fromString(
+            table.properties().getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT));
+
+    OutputFile outputFile = newPartitionStatsFile(table, fileFormat, 
snapshotId);

Review Comment:
   I added fallback to Avro fileformat when `fileFormat` is Orc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to