This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9f496bc9ff5 HIVE-28278: Iceberg: Stats: IllegalStateException Invalid
file: file length 0 (Denys Kuzmenko, reviewed by Ayush Saxena, Butao Zhang)
9f496bc9ff5 is described below
commit 9f496bc9ff5aa8ad9f27c71ff9d29bd0720a506d
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon May 27 16:14:20 2024 +0300
HIVE-28278: Iceberg: Stats: IllegalStateException Invalid file: file length
0 (Denys Kuzmenko, reviewed by Ayush Saxena, Butao Zhang)
Closes #5261
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 119 +++++++++++----------
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 16 +++
.../iceberg/mr/hive/TestHiveIcebergStatistics.java | 5 +-
...ery_iceberg_metadata_of_partitioned_table.q.out | 8 ++
...y_iceberg_metadata_of_unpartitioned_table.q.out | Bin 39970 -> 40270 bytes
5 files changed, 91 insertions(+), 57 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 275aa993d46..3157fbb0411 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +48,6 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -144,6 +145,8 @@ import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FindFiles;
+import org.apache.iceberg.GenericBlobMetadata;
+import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
@@ -161,6 +164,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -195,7 +199,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -295,7 +298,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
*/
static class HiveIcebergNoJobCommitter extends HiveIcebergOutputCommitter {
@Override
- public void commitJob(JobContext originalContext) throws IOException {
+ public void commitJob(JobContext originalContext) {
// do nothing
}
}
@@ -389,7 +392,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
}
predicate.pushedPredicate = (ExprNodeGenericFuncDesc) pushedPredicate;
- Expression filterExpr = (Expression)
HiveIcebergInputFormat.getFilterExpr(conf, predicate.pushedPredicate);
+ Expression filterExpr = HiveIcebergInputFormat.getFilterExpr(conf,
predicate.pushedPredicate);
if (filterExpr != null) {
SessionStateUtil.addResource(conf, InputFormatConfig.QUERY_FILTERS,
filterExpr);
}
@@ -500,33 +503,58 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
@Override
public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, List<ColumnStatistics> colStats) {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- String snapshotId = String.format("%s-STATS-%d", tbl.name(),
tbl.currentSnapshot().snapshotId());
- return writeColStats(colStats.get(0), tbl, snapshotId);
+ return writeColStats(colStats.get(0), tbl);
}
- private boolean writeColStats(ColumnStatistics tableColStats, Table tbl,
String snapshotId) {
+ private boolean writeColStats(ColumnStatistics tableColStats, Table tbl) {
try {
- boolean rewriteStats = removeColStatsIfExists(tbl);
- if (!rewriteStats) {
+ if (!shouldRewriteColStats(tbl)) {
checkAndMergeColStats(tableColStats, tbl);
}
// Currently, we are only serializing table level stats.
byte[] serializeColStats = SerializationUtils.serialize(tableColStats);
- try (PuffinWriter writer =
Puffin.write(tbl.io().newOutputFile(getColStatsPath(tbl).toString()))
+ StatisticsFile statisticsFile;
+ String statsPath = tbl.location() + STATS + UUID.randomUUID();
+
+ try (PuffinWriter puffinWriter =
Puffin.write(tbl.io().newOutputFile(statsPath))
.createdBy(Constants.HIVE_ENGINE).build()) {
- writer.add(new Blob(tbl.name() + "-" + snapshotId,
ImmutableList.of(1), tbl.currentSnapshot().snapshotId(),
- tbl.currentSnapshot().sequenceNumber(),
ByteBuffer.wrap(serializeColStats), PuffinCompressionCodec.NONE,
- ImmutableMap.of()));
- writer.finish();
- return true;
+ long snapshotId = tbl.currentSnapshot().snapshotId();
+ long snapshotSequenceNumber = tbl.currentSnapshot().sequenceNumber();
+ puffinWriter.add(
+ new Blob(
+ ColumnStatisticsObj.class.getSimpleName(),
+ ImmutableList.of(1),
+ snapshotId,
+ snapshotSequenceNumber,
+ ByteBuffer.wrap(serializeColStats),
+ PuffinCompressionCodec.NONE,
+ ImmutableMap.of()
+ ));
+ puffinWriter.finish();
+
+ statisticsFile =
+ new GenericStatisticsFile(
+ snapshotId,
+ statsPath,
+ puffinWriter.fileSize(),
+ puffinWriter.footerSize(),
+ puffinWriter.writtenBlobsMetadata().stream()
+ .map(GenericBlobMetadata::from)
+ .collect(ImmutableList.toImmutableList())
+ );
} catch (IOException e) {
LOG.warn("Unable to write stats to puffin file {}", e.getMessage());
return false;
}
- } catch (InvalidObjectException | IOException e) {
+ tbl.updateStatistics()
+ .setStatistics(statisticsFile.snapshotId(), statisticsFile)
+ .commit();
+ return true;
+
+ } catch (Exception e) {
LOG.warn("Unable to invalidate or merge stats: {}", e.getMessage());
- return false;
}
+ return false;
}
@Override
@@ -536,36 +564,29 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
}
private boolean canProvideColStats(Table table, long snapshotId) {
- Path statsPath = getColStatsPath(table, snapshotId);
- try {
- FileSystem fs = statsPath.getFileSystem(conf);
- return fs.exists(statsPath);
- } catch (Exception e) {
- LOG.warn("Exception when trying to find Iceberg column stats for
table:{} , snapshot:{} , " +
- "statsPath: {} , stack trace: {}", table.name(),
table.currentSnapshot(), statsPath, e);
- }
- return false;
+ return IcebergTableUtil.getColStatsPath(table, snapshotId).isPresent();
}
@Override
public List<ColumnStatisticsObj>
getColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- Path statsPath = getColStatsPath(table);
- LOG.info("Using stats from puffin file at: {}", statsPath);
- return readColStats(table, statsPath).getStatsObj();
+ return IcebergTableUtil.getColStatsPath(table).map(statsPath ->
readColStats(table, statsPath))
+ .orElse(new ColumnStatistics()).getStatsObj();
}
private ColumnStatistics readColStats(Table table, Path statsPath) {
try (PuffinReader reader =
Puffin.read(table.io().newInputFile(statsPath.toString())).build()) {
List<BlobMetadata> blobMetadata = reader.fileMetadata().blobs();
- Map<BlobMetadata, ColumnStatistics> collect =
Streams.stream(reader.readAll(blobMetadata)).collect(
- Collectors.toMap(Pair::first, blobMetadataByteBufferPair ->
SerializationUtils.deserialize(
- ByteBuffers.toByteArray(blobMetadataByteBufferPair.second()))));
- return collect.get(blobMetadata.get(0));
- } catch (IOException | IndexOutOfBoundsException e) {
- LOG.warn(" Unable to read iceberg col stats from puffin files: ", e);
- return new ColumnStatistics();
+ Iterator<ByteBuffer> it =
Iterables.transform(reader.readAll(blobMetadata), Pair::second).iterator();
+ if (it.hasNext()) {
+ byte[] byteBuffer = ByteBuffers.toByteArray(it.next());
+ LOG.info("Using col stats from : {}", statsPath);
+ return SerializationUtils.deserialize(byteBuffer);
+ }
+ } catch (Exception e) {
+ LOG.warn(" Unable to read col stats: ", e);
}
+ return new ColumnStatistics();
}
@Override
@@ -591,30 +612,18 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
.toUpperCase();
}
- private Path getColStatsPath(Table table) {
- return getColStatsPath(table, table.currentSnapshot().snapshotId());
- }
-
- private Path getColStatsPath(Table table, long snapshotId) {
- return new Path(table.location() + STATS + snapshotId);
- }
-
- private boolean removeColStatsIfExists(Table tbl) throws IOException {
- Path statsPath = getColStatsPath(tbl);
- FileSystem fs = statsPath.getFileSystem(conf);
- if (fs.exists(statsPath)) {
- // Analyze table and stats updater thread
- return fs.delete(statsPath, true);
- }
+ private boolean shouldRewriteColStats(Table tbl) {
return
SessionStateUtil.getQueryState(conf).map(QueryState::getHiveOperation)
- .filter(opType -> HiveOperation.ANALYZE_TABLE == opType)
- .isPresent();
+ .filter(opType -> HiveOperation.ANALYZE_TABLE ==
opType).isPresent() ||
+ IcebergTableUtil.getColStatsPath(tbl).isPresent();
}
private void checkAndMergeColStats(ColumnStatistics statsObjNew, Table tbl)
throws InvalidObjectException {
Long previousSnapshotId = tbl.currentSnapshot().parentId();
if (previousSnapshotId != null && canProvideColStats(tbl,
previousSnapshotId)) {
- ColumnStatistics statsObjOld = readColStats(tbl, getColStatsPath(tbl,
previousSnapshotId));
+ ColumnStatistics statsObjOld = IcebergTableUtil.getColStatsPath(tbl,
previousSnapshotId)
+ .map(statsPath -> readColStats(tbl, statsPath))
+ .orElse(null);
if (statsObjOld != null && statsObjOld.getStatsObjSize() != 0 &&
!statsObjNew.getStatsObj().isEmpty()) {
MetaStoreServerUtils.mergeColStats(statsObjNew, statsObjOld);
}
@@ -2017,7 +2026,7 @@ public class HiveIcebergStorageHandler implements
HiveStoragePredicateHandler, H
.findField(partitionField.sourceId()).type());
Object value = Conversions.fromPartitionString(resultType,
entry.getValue());
TransformSpec.TransformType transformType =
TransformSpec.fromString(partitionField.transform().toString());
- Iterable iterable = () -> Collections.singletonList(value).iterator();
+ Iterable<?> iterable = () ->
Collections.singletonList(value).iterator();
if (TransformSpec.TransformType.IDENTITY == transformType) {
Expression boundPredicate = Expressions.in(partitionField.name(),
iterable);
finalExp = Expressions.and(finalExp, boundPredicate);
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index a5df4d7b941..79454f45ee8 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -27,6 +27,8 @@ import java.util.function.BinaryOperator;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
@@ -133,6 +135,20 @@ public class IcebergTableUtil {
return getTable(configuration, properties, false);
}
+ static Optional<Path> getColStatsPath(Table table) {
+ return getColStatsPath(table, table.currentSnapshot().snapshotId());
+ }
+
+ static Optional<Path> getColStatsPath(Table table, long snapshotId) {
+ return table.statisticsFiles().stream()
+ .filter(stats -> stats.snapshotId() == snapshotId)
+ .filter(stats -> stats.blobMetadata().stream()
+ .anyMatch(metadata ->
ColumnStatisticsObj.class.getSimpleName().equals(metadata.type()))
+ )
+ .map(stats -> new Path(stats.path()))
+ .findAny();
+ }
+
/**
* Create {@link PartitionSpec} based on the partition information stored in
* {@link TransformSpec}.
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
index f377b523be5..187b9b462ff 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStatistics.java
@@ -38,7 +38,6 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
-import static org.apache.iceberg.mr.hive.HiveIcebergStorageHandler.STATS;
/**
* Tests verifying correct statistics generation behaviour on Iceberg tables
triggered by: ANALYZE queries, inserts,
@@ -275,7 +274,9 @@ public class TestHiveIcebergStatistics extends
HiveIcebergStorageHandlerWithEngi
shell.executeStatement(insert);
table.refresh();
- Path tblColPath = new Path(table.location() + STATS +
table.currentSnapshot().snapshotId());
+
+ Path tblColPath = IcebergTableUtil.getColStatsPath(table).orElse(null);
+ Assert.assertNotNull(tblColPath);
// Check that if colPath is created correctly
Assert.assertTrue(tblColPath.getFileSystem(shell.getHiveConf()).exists(tblColPath));
List<Object[]> result = shell.executeStatement("SELECT * FROM customers");
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
index 136ca8abd5c..b705af1b3ab 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_partitioned_table.q.out
@@ -400,6 +400,10 @@ hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_meta_3
@@ -696,6 +700,10 @@ hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
+hdfs://### HDFS PATH ###
PREHOOK: query: select file from default.ice_meta_3.metadata_log_entries
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_meta_3
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
index fe8f1ec128f..ecf586dfa80 100644
Binary files
a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
and
b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_metadata_of_unpartitioned_table.q.out
differ