This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f496bc9ff5 HIVE-28278: Iceberg: Stats: IllegalStateException Invalid 
file: file length 0 (Denys Kuzmenko, reviewed by Ayush Saxena, Butao Zhang)
9f496bc9ff5 is described below

commit 9f496bc9ff5aa8ad9f27c71ff9d29bd0720a506d
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon May 27 16:14:20 2024 +0300

    HIVE-28278: Iceberg: Stats: IllegalStateException Invalid file: file length 
0 (Denys Kuzmenko, reviewed by Ayush Saxena, Butao Zhang)
    
    Closes #5261
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 119 +++++++++++----------
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  16 +++
 .../iceberg/mr/hive/TestHiveIcebergStatistics.java |   5 +-
 ...ery_iceberg_metadata_of_partitioned_table.q.out |   8 ++
 ...y_iceberg_metadata_of_unpartitioned_table.q.out | Bin 39970 -> 40270 bytes
 5 files changed, 91 insertions(+), 57 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 275aa993d46..3157fbb0411 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +48,6 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -144,6 +145,8 @@ import org.apache.iceberg.ExpireSnapshots;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.FindFiles;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetadataTableUtils;
@@ -161,6 +164,7 @@ import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SortDirection;
 import org.apache.iceberg.SortField;
 import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StatisticsFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -195,7 +199,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -295,7 +298,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
    */
   static class HiveIcebergNoJobCommitter extends HiveIcebergOutputCommitter {
     @Override
-    public void commitJob(JobContext originalContext) throws IOException {
+    public void commitJob(JobContext originalContext) {
       // do nothing
     }
   }
@@ -389,7 +392,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
       }
     }
     predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
-    Expression filterExpr = (Expression) 
HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
+    Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf, 
predicate.pushedPredicate);
     if (filterExpr != null) {
       SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS, 
filterExpr);
     }
@@ -500,33 +503,58 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   @Override
   public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable, List<ColumnStatistics> colStats) {
     Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    String snapshotId = String.format("%s-STATS-%d", tbl.name(), 
tbl.currentSnapshot().snapshotId());
-    return writeColStats(colStats.get(0), tbl, snapshotId);
+    return writeColStats(colStats.get(0), tbl);
   }
 
-  private boolean writeColStats(ColumnStatistics tableColStats, Table tbl, 
String snapshotId) {
+  private boolean writeColStats(ColumnStatistics tableColStats, Table tbl) {
     try {
-      boolean rewriteStats = removeColStatsIfExists(tbl);
-      if (!rewriteStats) {
+      if (!shouldRewriteColStats(tbl)) {
         checkAndMergeColStats(tableColStats, tbl);
       }
       // Currently, we are only serializing table level stats.
       byte[] serializeColStats = SerializationUtils.serialize(tableColStats);
-      try (PuffinWriter writer = 
Puffin.write(tbl.io().newOutputFile(getColStatsPath(tbl).toString()))
+      StatisticsFile statisticsFile;
+      String statsPath = tbl.location() + STATS + UUID.randomUUID();
+
+      try (PuffinWriter puffinWriter = 
Puffin.write(tbl.io().newOutputFile(statsPath))
           .createdBy(Constants.HIVE_ENGINE).build()) {
-        writer.add(new Blob(tbl.name() + "-" + snapshotId, 
ImmutableList.of(1), tbl.currentSnapshot().snapshotId(),
-            tbl.currentSnapshot().sequenceNumber(), 
ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE,
-            ImmutableMap.of()));
-        writer.finish();
-        return true;
+        long snapshotId = tbl.currentSnapshot().snapshotId();
+        long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
+        puffinWriter.add(
+            new Blob(
+              ColumnStatisticsObj.class.getSimpleName(),
+              ImmutableList.of(1),
+              snapshotId,
+              snapshotSequenceNumber,
+              ByteBuffer.wrap(serializeColStats),
+              PuffinCompressionCodec.NONE,
+              ImmutableMap.of()
+          ));
+        puffinWriter.finish();
+
+        statisticsFile =
+          new GenericStatisticsFile(
+            snapshotId,
+            statsPath,
+            puffinWriter.fileSize(),
+            puffinWriter.footerSize(),
+            puffinWriter.writtenBlobsMetadata().stream()
+              .map(GenericBlobMetadata::from)
+              .collect(ImmutableList.toImmutableList())
+          );
       } catch (IOException e) {
         LOG.warn("Unable to write stats to puffin file {}", e.getMessage());
         return false;
       }
-    } catch (InvalidObjectException | IOException e) {
+      tbl.updateStatistics()
+          .setStatistics(statisticsFile.snapshotId(), statisticsFile)
+          .commit();
+      return true;
+
+    } catch (Exception e) {
       LOG.warn("Unable to invalidate or merge stats: {}", e.getMessage());
-      return false;
     }
+    return false;
   }
 
   @Override
@@ -536,36 +564,29 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
   }
 
   private boolean canProvideColStats(Table table, long snapshotId) {
-    Path statsPath = getColStatsPath(table, snapshotId);
-    try {
-      FileSystem fs = statsPath.getFileSystem(conf);
-      return  fs.exists(statsPath);
-    } catch (Exception e) {
-      LOG.warn("Exception when trying to find Iceberg column stats for 
table:{} , snapshot:{} , " +
-          "statsPath: {} , stack trace: {}", table.name(), 
table.currentSnapshot(), statsPath, e);
-    }
-    return false;
+    return IcebergTableUtil.getColStatsPath(table, snapshotId).isPresent();
   }
 
   @Override
   public List<ColumnStatisticsObj> 
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    Path statsPath = getColStatsPath(table);
-    LOG.info("Using stats from puffin file at: {}", statsPath);
-    return readColStats(table, statsPath).getStatsObj();
+    return IcebergTableUtil.getColStatsPath(table).map(statsPath -> 
readColStats(table, statsPath))
+      .orElse(new ColumnStatistics()).getStatsObj();
   }
 
   private ColumnStatistics readColStats(Table table, Path statsPath) {
     try (PuffinReader reader = 
Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
       List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
-      Map<BlobMetadata, ColumnStatistics> collect = 
Streams.stream(reader.readAll(blobMetadata)).collect(
-          Collectors.toMap(Pair::first, blobMetadataByteBufferPair -> 
SerializationUtils.deserialize(
-              ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
-      return collect.get(blobMetadata.get(0));
-    } catch (IOException | IndexOutOfBoundsException e) {
-      LOG.warn(" Unable to read iceberg col stats from puffin files: ", e);
-      return new ColumnStatistics();
+      Iterator<ByteBuffer> it = 
Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
+      if (it.hasNext()) {
+        byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
+        LOG.info("Using col stats from : {}", statsPath);
+        return SerializationUtils.deserialize(byteBuffer);
+      }
+    } catch (Exception e) {
+      LOG.warn(" Unable to read col stats: ", e);
     }
+    return new ColumnStatistics();
   }
 
   @Override
@@ -591,30 +612,18 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         .toUpperCase();
   }
 
-  private Path getColStatsPath(Table table) {
-    return getColStatsPath(table, table.currentSnapshot().snapshotId());
-  }
-
-  private Path getColStatsPath(Table table, long snapshotId) {
-    return new Path(table.location() + STATS + snapshotId);
-  }
-
-  private boolean removeColStatsIfExists(Table tbl) throws IOException {
-    Path statsPath = getColStatsPath(tbl);
-    FileSystem fs = statsPath.getFileSystem(conf);
-    if (fs.exists(statsPath)) {
-      // Analyze table and stats updater thread
-      return fs.delete(statsPath, true);
-    }
+  private boolean shouldRewriteColStats(Table tbl) {
     return 
SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
-      .filter(opType -> HiveOperation.ANALYZE_TABLE == opType)
-      .isPresent();
+              .filter(opType -> HiveOperation.ANALYZE_TABLE == 
opType).isPresent() ||
+          IcebergTableUtil.getColStatsPath(tbl).isPresent();
   }
 
   private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl) 
throws InvalidObjectException {
     Long previousSnapshotId = tbl.currentSnapshot().parentId();
     if (previousSnapshotId != null && canProvideColStats(tbl, 
previousSnapshotId)) {
-      ColumnStatistics statsObjOld = readColStats(tbl, getColStatsPath(tbl, 
previousSnapshotId));
+      ColumnStatistics statsObjOld = IcebergTableUtil.getColStatsPath(tbl, 
previousSnapshotId)
+          .map(statsPath -> readColStats(tbl, statsPath))
+          .orElse(null);
       if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 && 
!statsObjNew.getStatsObj().isEmpty()) {
         MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
       }
@@ -2017,7 +2026,7 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
                 .findField(partitionField.sourceId()).type());
         Object value = Conversions.fromPartitionString(resultType, 
entry.getValue());
         TransformSpec.TransformType transformType = 
TransformSpec.fromString(partitionField.transform().toString());
-        Iterable iterable = () -> Collections.singletonList(value).iterator();
+        Iterable<?> iterable = () -> 
Collections.singletonList(value).iterator();
         if (TransformSpec.TransformType.IDENTITY == transformType) {
           Expression boundPredicate = Expressions.in(partitionField.name(), 
iterable);
           finalExp = Expressions.and(finalExp, boundPredicate);
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index a5df4d7b941..79454f45ee8 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -27,6 +27,8 @@ import java.util.function.BinaryOperator;
 import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -133,6 +135,20 @@ public class IcebergTableUtil {
     return getTable(configuration, properties, false);
   }
 
+  static Optional<Path> getColStatsPath(Table table) {
+    return getColStatsPath(table, table.currentSnapshot().snapshotId());
+  }
+
+  static Optional<Path> getColStatsPath(Table table, long snapshotId) {
+    return table.statisticsFiles().stream()
+      .filter(stats -> stats.snapshotId() == snapshotId)
+      .filter(stats -> stats.blobMetadata().stream()
+        .anyMatch(metadata -> 
ColumnStatisticsObj.class.getSimpleName().equals(metadata.type()))
+      )
+      .map(stats -> new Path(stats.path()))
+      .findAny();
+  }
+
   /**
    * Create {@link PartitionSpec} based on the partition information stored in
    * {@link TransformSpec}.
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index f377b523be5..187b9b462ff 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -38,7 +38,6 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-import static org.apache.iceberg.mr.hive.HiveIcebergStorageHandler.STATS;
 
 /**
  * Tests verifying correct statistics generation behaviour on Iceberg tables 
triggered by: ANALYZE queries, inserts,
@@ -275,7 +274,9 @@ public class TestHiveIcebergStatistics extends 
HiveIcebergStorageHandlerWithEngi
     shell.executeStatement(insert);
 
     table.refresh();
-    Path tblColPath = new Path(table.location() + STATS + 
table.currentSnapshot().snapshotId());
+
+    Path tblColPath = IcebergTableUtil.getColStatsPath(table).orElse(null);
+    Assert.assertNotNull(tblColPath);
     // Check that if colPath is created correctly
     
Assert.assertTrue(tblColPath.getFileSystem(shell.getHiveConf()).exists(tblColPath));
     List<Object[]> result = shell.executeStatement("SELECT * FROM customers");
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
index 136ca8abd5c..b705af1b3ab 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
@@ -400,6 +400,10 @@ hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
 PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
 PREHOOK: type: QUERY
 PREHOOK: Input: default@ice_meta_3
@@ -696,6 +700,10 @@ hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
 hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
 PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
 PREHOOK: type: QUERY
 PREHOOK: Input: default@ice_meta_3
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
index fe8f1ec128f..ecf586dfa80 100644
Binary files 
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
 and 
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
 differ

Reply via email to