This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1271e8443bf HIVE-29205: Addendum: Iceberg: Upgrade iceberg version to
1.10.0 (#6235)
1271e8443bf is described below
commit 1271e8443bfdd29b46df78e3360f9175ca8acb62
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Dec 19 10:40:16 2025 +0200
HIVE-29205: Addendum: Iceberg: Upgrade iceberg version to 1.10.0 (#6235)
---
.../org/apache/iceberg/hive/HiveTableTest.java | 7 +-
.../apache/iceberg/data/PartitionStatsHandler.java | 285 ----------
.../iceberg/mr/hive/BaseHiveIcebergMetaHook.java | 6 +-
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 3 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 2 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 43 +-
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 12 +
.../iceberg/data/TestPartitionStatsHandler.java | 605 ---------------------
.../iceberg/mr/hive/TestHiveIcebergRollback.java | 4 +-
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 25 +-
itests/hive-iceberg/pom.xml | 2 +-
pom.xml | 2 +-
12 files changed, 66 insertions(+), 930 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 7bf66012030..8be9740be49 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -38,18 +38,17 @@
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hive.iceberg.org.apache.avro.generic.GenericData;
import org.apache.hive.iceberg.org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
-import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
@@ -259,7 +258,7 @@ public void testDropTable() throws IOException {
.as("Table manifest files should not exist")
.doesNotExist();
}
- assertThat(new File(((HasTableOperations)
table).operations().current().metadataFileLocation()
+ assertThat(new File(TableUtil.metadataFileLocation(table)
.replace("file:", "")))
.as("Table metadata file should not exist")
.doesNotExist();
@@ -552,7 +551,7 @@ public void testRegisterHadoopTableToHiveCatalog() throws
IOException, TExceptio
.hasMessage("Table does not exist: hivedb.table1");
// register the table to hive catalog using the latest metadata file
- String latestMetadataFile = ((BaseTable)
table).operations().current().metadataFileLocation();
+ String latestMetadataFile = TableUtil.metadataFileLocation(table);
catalog.registerTable(identifier, "file:" + latestMetadataFile);
assertThat(HIVE_METASTORE_EXTENSION.metastoreClient().getTable(DB_NAME,
"table1")).isNotNull();
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
deleted file mode 100644
index 20e819dec45..00000000000
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.data;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Locale;
-import java.util.UUID;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.HasTableOperations;
-import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionStatisticsFile;
-import org.apache.iceberg.PartitionStats;
-import org.apache.iceberg.PartitionStatsUtil;
-import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.InternalReader;
-import org.apache.iceberg.data.parquet.InternalWriter;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.Types.IntegerType;
-import org.apache.iceberg.types.Types.LongType;
-import org.apache.iceberg.types.Types.NestedField;
-import org.apache.iceberg.types.Types.StructType;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-
-/**
- * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses
generic readers and writers
- * to support writing and reading of the stats in table default format.
- */
-public class PartitionStatsHandler {
-
- private PartitionStatsHandler() {
- }
-
- public static final int PARTITION_FIELD_ID = 0;
- public static final String PARTITION_FIELD_NAME = "partition";
- public static final NestedField SPEC_ID = NestedField.required(1, "spec_id",
IntegerType.get());
- public static final NestedField DATA_RECORD_COUNT =
- NestedField.required(2, "data_record_count", LongType.get());
- public static final NestedField DATA_FILE_COUNT =
- NestedField.required(3, "data_file_count", IntegerType.get());
- public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
- NestedField.required(4, "total_data_file_size_in_bytes", LongType.get());
- public static final NestedField POSITION_DELETE_RECORD_COUNT =
- NestedField.optional(5, "position_delete_record_count", LongType.get());
- public static final NestedField POSITION_DELETE_FILE_COUNT =
- NestedField.optional(6, "position_delete_file_count", IntegerType.get());
- public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
- NestedField.optional(7, "equality_delete_record_count", LongType.get());
- public static final NestedField EQUALITY_DELETE_FILE_COUNT =
- NestedField.optional(8, "equality_delete_file_count", IntegerType.get());
- public static final NestedField TOTAL_RECORD_COUNT =
- NestedField.optional(9, "total_record_count", LongType.get());
- public static final NestedField LAST_UPDATED_AT =
- NestedField.optional(10, "last_updated_at", LongType.get());
- public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
- NestedField.optional(11, "last_updated_snapshot_id", LongType.get());
-
- /**
- * Generates the partition stats file schema based on a combined partition
type which considers
- * all specs in a table.
- *
- * @param unifiedPartitionType unified partition schema type. Could be
calculated by {@link
- * Partitioning#partitionType(Table)}.
- * @return a schema that corresponds to the provided unified partition type.
- */
- public static Schema schema(StructType unifiedPartitionType) {
- Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table
must be partitioned");
- return new Schema(
- NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME,
unifiedPartitionType),
- SPEC_ID,
- DATA_RECORD_COUNT,
- DATA_FILE_COUNT,
- TOTAL_DATA_FILE_SIZE_IN_BYTES,
- POSITION_DELETE_RECORD_COUNT,
- POSITION_DELETE_FILE_COUNT,
- EQUALITY_DELETE_RECORD_COUNT,
- EQUALITY_DELETE_FILE_COUNT,
- TOTAL_RECORD_COUNT,
- LAST_UPDATED_AT,
- LAST_UPDATED_SNAPSHOT_ID);
- }
-
- /**
- * Computes and writes the {@link PartitionStatisticsFile} for a given
table's current snapshot.
- *
- * @param table The {@link Table} for which the partition statistics is
computed.
- * @return {@link PartitionStatisticsFile} for the current snapshot, or null
if no statistics are
- * present.
- */
- public static PartitionStatisticsFile computeAndWriteStatsFile(Table table)
throws IOException {
- if (table.currentSnapshot() == null) {
- return null;
- }
-
- return computeAndWriteStatsFile(table,
table.currentSnapshot().snapshotId());
- }
-
- /**
- * Computes and writes the {@link PartitionStatisticsFile} for a given table
and snapshot.
- *
- * @param table The {@link Table} for which the partition statistics is
computed.
- * @param snapshotId snapshot for which partition statistics are computed.
- * @return {@link PartitionStatisticsFile} for the given snapshot, or null
if no statistics are
- * present.
- */
- public static PartitionStatisticsFile computeAndWriteStatsFile(Table table,
long snapshotId)
- throws IOException {
- Snapshot snapshot = table.snapshot(snapshotId);
- Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s",
snapshotId);
-
- Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table,
snapshot);
- if (stats.isEmpty()) {
- return null;
- }
-
- StructType partitionType = Partitioning.partitionType(table);
- List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats,
partitionType);
- return writePartitionStatsFile(
- table, snapshot.snapshotId(), schema(partitionType), sortedStats);
- }
-
- @VisibleForTesting
- static PartitionStatisticsFile writePartitionStatsFile(
- Table table, long snapshotId, Schema dataSchema,
Iterable<PartitionStats> records)
- throws IOException {
- FileFormat fileFormat =
- FileFormat.fromString(
- table.properties().getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT));
-
- if (fileFormat == FileFormat.ORC) {
- // Internal writers are not supported for ORC yet. Temporary we go with
AVRO.
- fileFormat = FileFormat.AVRO;
- }
-
- OutputFile outputFile = newPartitionStatsFile(table, fileFormat,
snapshotId);
-
- try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile,
fileFormat)) {
- records.iterator().forEachRemaining(writer::write);
- }
-
- return ImmutableGenericPartitionStatisticsFile.builder()
- .snapshotId(snapshotId)
- .path(outputFile.location())
- .fileSizeInBytes(outputFile.toInputFile().getLength())
- .build();
- }
-
- /**
- * Reads partition statistics from the specified {@link InputFile} using
given schema.
- *
- * @param schema The {@link Schema} of the partition statistics file.
- * @param inputFile An {@link InputFile} pointing to the partition stats
file.
- */
- public static CloseableIterable<PartitionStats> readPartitionStatsFile(
- Schema schema, InputFile inputFile) {
- CloseableIterable<StructLike> records = dataReader(schema, inputFile);
- return CloseableIterable.transform(records,
PartitionStatsHandler::recordToPartitionStats);
- }
-
- private static OutputFile newPartitionStatsFile(
- Table table, FileFormat fileFormat, long snapshotId) {
- Preconditions.checkArgument(
- table instanceof HasTableOperations,
- "Table must have operations to retrieve metadata location");
-
- return table
- .io()
- .newOutputFile(
- ((HasTableOperations) table)
- .operations()
- .metadataFileLocation(
- fileFormat.addExtension(
- String.format(
- Locale.ROOT, "partition-stats-%d-%s", snapshotId,
UUID.randomUUID()))));
- }
-
- private static DataWriter<StructLike> dataWriter(
- Schema dataSchema, OutputFile outputFile, FileFormat fileFormat) throws
IOException {
- switch (fileFormat) {
- case PARQUET:
- return Parquet.writeData(outputFile)
- .schema(dataSchema)
- .createWriterFunc(InternalWriter::createWriter)
- .withSpec(PartitionSpec.unpartitioned())
- .build();
- case ORC:
- // Internal writers are not supported for ORC yet. Temporary we go
with AVRO.
- case AVRO:
- return Avro.writeData(outputFile)
- .schema(dataSchema)
- .createWriterFunc(org.apache.iceberg.avro.InternalWriter::create)
- .withSpec(PartitionSpec.unpartitioned())
- .build();
- default:
- throw new UnsupportedOperationException("Unsupported file format:" +
fileFormat.name());
- }
- }
-
- private static CloseableIterable<StructLike> dataReader(Schema schema,
InputFile inputFile) {
- FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
- Preconditions.checkArgument(
- fileFormat != null, "Unable to determine format of file: %s",
inputFile.location());
-
- switch (fileFormat) {
- case PARQUET:
- return Parquet.read(inputFile)
- .project(schema)
- .createReaderFunc(
- fileSchema ->
-
org.apache.iceberg.data.parquet.InternalReader.create(schema, fileSchema))
- .build();
- case ORC:
- // Internal writers are not supported for ORC yet. Temporary we go
with AVRO.
- case AVRO:
- return Avro.read(inputFile)
- .project(schema)
- .createReaderFunc(fileSchema -> InternalReader.create(schema))
- .build();
- default:
- throw new UnsupportedOperationException("Unsupported file format:" +
fileFormat.name());
- }
- }
-
- private static PartitionStats recordToPartitionStats(StructLike record) {
- PartitionStats stats =
- new PartitionStats(
- record.get(PARTITION_FIELD_ID, StructLike.class),
- record.get(SPEC_ID.fieldId(), Integer.class));
- stats.set(DATA_RECORD_COUNT.fieldId(),
record.get(DATA_RECORD_COUNT.fieldId(), Long.class));
- stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(),
Integer.class));
- stats.set(
- TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(),
- record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class));
- stats.set(
- POSITION_DELETE_RECORD_COUNT.fieldId(),
- record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class));
- stats.set(
- POSITION_DELETE_FILE_COUNT.fieldId(),
- record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class));
- stats.set(
- EQUALITY_DELETE_RECORD_COUNT.fieldId(),
- record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class));
- stats.set(
- EQUALITY_DELETE_FILE_COUNT.fieldId(),
- record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class));
- stats.set(TOTAL_RECORD_COUNT.fieldId(),
record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class));
- stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(),
Long.class));
- stats.set(
- LAST_UPDATED_SNAPSHOT_ID.fieldId(),
- record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class));
- return stats;
- }
-}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index 180e233c947..5d83155b4ea 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -46,7 +46,6 @@
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.iceberg.BaseMetastoreTableOperations;
-import org.apache.iceberg.BaseTable;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -57,6 +56,7 @@
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
@@ -441,7 +441,7 @@ static boolean
isOrcFileFormat(org.apache.hadoop.hive.metastore.api.Table hmsTab
}
protected void setWriteModeDefaults(Table icebergTbl, Map<String, String>
newProps, EnvironmentContext context) {
- if ((icebergTbl == null || ((BaseTable)
icebergTbl).operations().current().formatVersion() == 1) &&
+ if ((icebergTbl == null || TableUtil.formatVersion(icebergTbl) == 1) &&
IcebergTableUtil.isV2TableOrAbove(newProps)) {
List<String> writeModeList = ImmutableList.of(
TableProperties.DELETE_MODE, TableProperties.UPDATE_MODE,
TableProperties.MERGE_MODE);
@@ -473,7 +473,7 @@ public void
postGetTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
if (hmsTable != null) {
try {
Table tbl = IcebergTableUtil.getTable(conf, hmsTable);
- String formatVersion = String.valueOf(((BaseTable)
tbl).operations().current().formatVersion());
+ String formatVersion = String.valueOf(TableUtil.formatVersion(tbl));
hmsTable.getParameters().put(TableProperties.FORMAT_VERSION,
formatVersion);
// Set the serde info
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getName());
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index e4bc7f32cc4..7f77d7dde9e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -96,6 +96,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateProperties;
@@ -552,7 +553,7 @@ public void
rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTab
}
// we want to keep the data files but get rid of the metadata directory
- String metadataLocation = ((BaseTable)
this.icebergTable).operations().current().metadataFileLocation();
+ String metadataLocation =
TableUtil.metadataFileLocation(this.icebergTable);
try {
Path path = new Path(metadataLocation).getParent();
FileSystem.get(path.toUri(), conf).delete(path, true);
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index cac9b480232..1956d8100b6 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -72,7 +72,7 @@ private static HiveIcebergWriter writer(JobConf jc) {
}
private static void setWriterLevelConfiguration(JobConf jc, Table table) {
- final String writeFormat = table.properties().get("write.format.default");
+ final String writeFormat =
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
if (writeFormat == null || "PARQUET".equalsIgnoreCase(writeFormat)) {
if (table.properties().get(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES)
== null &&
jc.get(ParquetOutputFormat.BLOCK_SIZE) != null) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 7b1aabf9915..b53b9af266f 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -85,7 +85,6 @@
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import
org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -151,6 +150,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.PartitionStats;
+import org.apache.iceberg.PartitionStatsHandler;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
@@ -164,8 +164,9 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.TableUtil;
+import org.apache.iceberg.Transaction;
import org.apache.iceberg.actions.DeleteOrphanFiles;
-import org.apache.iceberg.data.PartitionStatsHandler;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Evaluator;
@@ -433,8 +434,8 @@ public boolean
canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata.
public StorageFormatDescriptor
getStorageFormatDescriptor(org.apache.hadoop.hive.metastore.api.Table table)
throws SemanticException {
if (table.getParameters() != null) {
- String format =
table.getParameters().getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
IOConstants.PARQUET);
- return StorageFormat.getDescriptor(format,
TableProperties.DEFAULT_FILE_FORMAT);
+ FileFormat format =
IcebergTableUtil.defaultFileFormat(table.getParameters()::getOrDefault);
+ return StorageFormat.getDescriptor(format.name(),
TableProperties.DEFAULT_FILE_FORMAT);
}
return null;
}
@@ -460,8 +461,8 @@ public void
appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fr
Map<String, String> partitionSpec)
throws SemanticException {
Table icebergTbl = IcebergTableUtil.getTable(conf, table);
- String format =
table.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT);
- HiveTableUtil.appendFiles(fromURI, format, icebergTbl, isOverwrite,
partitionSpec, conf);
+ FileFormat format = IcebergTableUtil.defaultFileFormat(icebergTbl);
+ HiveTableUtil.appendFiles(fromURI, format.name(), icebergTbl, isOverwrite,
partitionSpec, conf);
}
@Override
@@ -535,7 +536,19 @@ public Map<String, String> computeBasicStatistics(Partish
partish) {
PartitionStatisticsFile statsFile =
IcebergTableUtil.getPartitionStatsFile(table, snapshot.snapshotId());
if (statsFile == null) {
try {
- statsFile = PartitionStatsHandler.computeAndWriteStatsFile(table);
+ Table statsTable = table;
+ if (FileFormat.ORC == IcebergTableUtil.defaultFileFormat(table)) {
+ // PartitionStatsHandler uses the table default file format for
writing the stats file.
+ // ORC is not supported by InternalData writers, so we create an
uncommitted transaction
+ // view of the table without DEFAULT_FILE_FORMAT to fall back to
DEFAULT_FILE_FORMAT_DEFAULT.
+ // NOTE: we intentionally do not call commitTransaction(), so this
property change is never published.
+ Transaction tx = table.newTransaction();
+ tx.updateProperties()
+ .remove(TableProperties.DEFAULT_FILE_FORMAT)
+ .commit();
+ statsTable = tx.table();
+ }
+ statsFile =
PartitionStatsHandler.computeAndWriteStatsFile(statsTable);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -553,7 +566,7 @@ private static Map<String, String>
getPartishSummary(Partish partish, Table tabl
PartitionStatisticsFile statsFile =
IcebergTableUtil.getPartitionStatsFile(table, snapshot.snapshotId());
if (statsFile != null) {
Types.StructType partitionType = Partitioning.partitionType(table);
- Schema recordSchema = PartitionStatsHandler.schema(partitionType);
+ Schema recordSchema = PartitionStatsHandler.schema(partitionType,
TableUtil.formatVersion(table));
try (CloseableIterable<PartitionStats> recordIterator =
PartitionStatsHandler.readPartitionStatsFile(
recordSchema, table.io().newInputFile(statsFile.path()))) {
@@ -1115,7 +1128,7 @@ public void
executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
"try altering the metadata location to the current metadata
location by executing the following query:" +
"ALTER TABLE {}.{} SET TBLPROPERTIES('metadata_location'='{}').
This operation is supported for Hive " +
"Catalog tables.", hmsTable.getDbName(), hmsTable.getTableName(),
- ((BaseTable)
icebergTable).operations().current().metadataFileLocation());
+ TableUtil.metadataFileLocation(icebergTable));
AlterTableExecuteSpec.RollbackSpec rollbackSpec =
(AlterTableExecuteSpec.RollbackSpec)
executeSpec.getOperationParams();
IcebergTableUtil.rollback(icebergTable,
rollbackSpec.getRollbackType(), rollbackSpec.getParam());
@@ -1377,8 +1390,7 @@ public URI
getURIForAuth(org.apache.hadoop.hive.metastore.api.Table hmsTable) th
.append(encodeString("/metadata/dummy.metadata.json"));
} else {
Table table = IcebergTableUtil.getTable(conf, hmsTable);
- authURI.append(getPathForAuth(((BaseTable)
table).operations().current().metadataFileLocation(),
- hmsTable.getSd().getLocation()));
+ authURI.append(getPathForAuth(TableUtil.metadataFileLocation(table),
hmsTable.getSd().getLocation()));
}
}
LOG.debug("Iceberg storage handler authorization URI {}", authURI);
@@ -1736,7 +1748,7 @@ private String
collectColumnAndReplaceDummyValues(ExprNodeDesc node, String foun
private void fallbackToNonVectorizedModeBasedOnProperties(Properties
tableProps) {
Schema tableSchema =
SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA));
- if
(FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT))
||
+ if (FileFormat.AVRO ==
IcebergTableUtil.defaultFileFormat(tableProps::getProperty) ||
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY))
||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
@@ -1755,10 +1767,11 @@ private void
fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
* @return true if having time type column
*/
private static boolean hasOrcTimeInSchema(Properties tableProps, Schema
tableSchema) {
- if
(!FileFormat.ORC.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)))
{
+ if (FileFormat.ORC !=
IcebergTableUtil.defaultFileFormat(tableProps::getProperty)) {
return false;
}
- return tableSchema.columns().stream().anyMatch(f ->
Types.TimeType.get().typeId() == f.type().typeId());
+ return tableSchema.columns().stream()
+ .anyMatch(f -> Types.TimeType.get().typeId() == f.type().typeId());
}
/**
@@ -1770,7 +1783,7 @@ private static boolean hasOrcTimeInSchema(Properties
tableProps, Schema tableSch
* @return true if having nested types
*/
private static boolean hasParquetNestedTypeWithinListOrMap(Properties
tableProps, Schema tableSchema) {
- if
(!FileFormat.PARQUET.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)))
{
+ if (FileFormat.PARQUET !=
IcebergTableUtil.defaultFileFormat(tableProps::getProperty)) {
return true;
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 41e73e9211d..6a6b83f6cc1 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -67,6 +67,7 @@
import org.apache.hadoop.util.Sets;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.ManifestFile;
@@ -440,6 +441,17 @@ private static Integer
formatVersion(BinaryOperator<String> props) {
}
}
+ public static FileFormat defaultFileFormat(Table table) {
+ return defaultFileFormat(table.properties()::getOrDefault);
+ }
+
+ public static FileFormat defaultFileFormat(BinaryOperator<String> props) {
+ return FileFormat.fromString(
+ props.apply(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+ }
+
private static String getWriteModeDefault(BinaryOperator<String> props) {
return (isV2TableOrAbove(props) ? MERGE_ON_READ :
COPY_ON_WRITE).modeName();
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java
deleted file mode 100644
index 753af7505ec..00000000000
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.data;
-
-import java.io.File;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.PartitionData;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionStatisticsFile;
-import org.apache.iceberg.PartitionStats;
-import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.TestHelpers;
-import org.apache.iceberg.TestTables;
-import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.expressions.Literal;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.assertj.core.api.Assumptions;
-import org.assertj.core.groups.Tuple;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import static org.apache.iceberg.data.PartitionStatsHandler.DATA_FILE_COUNT;
-import static org.apache.iceberg.data.PartitionStatsHandler.DATA_RECORD_COUNT;
-import static
org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_FILE_COUNT;
-import static
org.apache.iceberg.data.PartitionStatsHandler.EQUALITY_DELETE_RECORD_COUNT;
-import static org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_AT;
-import static
org.apache.iceberg.data.PartitionStatsHandler.LAST_UPDATED_SNAPSHOT_ID;
-import static org.apache.iceberg.data.PartitionStatsHandler.PARTITION_FIELD_ID;
-import static
org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_FILE_COUNT;
-import static
org.apache.iceberg.data.PartitionStatsHandler.POSITION_DELETE_RECORD_COUNT;
-import static
org.apache.iceberg.data.PartitionStatsHandler.TOTAL_DATA_FILE_SIZE_IN_BYTES;
-import static org.apache.iceberg.data.PartitionStatsHandler.TOTAL_RECORD_COUNT;
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-public class TestPartitionStatsHandler {
- private static final Schema SCHEMA =
- new Schema(
- optional(1, "c1", Types.IntegerType.get()),
- optional(2, "c2", Types.StringType.get()),
- optional(3, "c3", Types.StringType.get()));
-
- protected static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
-
- @TempDir
- public Path temp;
-
- private static final Random RANDOM = ThreadLocalRandom.current();
-
-// @Parameters(name = "fileFormat = {0}")
-// public static List<Object> parameters() {
-// return Arrays.asList(FileFormat.PARQUET, FileFormat.ORC,
FileFormat.AVRO);
-// }
-
- public FileFormat format = FileFormat.AVRO;
-
- @Test
- public void testPartitionStatsOnEmptyTable() throws Exception {
- Table testTable = TestTables.create(tempDir("empty_table"), "empty_table",
SCHEMA, SPEC, 2);
-
assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable)).isNull();
- }
-
- @Test
- public void testPartitionStatsOnEmptyBranch() throws Exception {
- Table testTable = TestTables.create(tempDir("empty_branch"),
"empty_branch", SCHEMA, SPEC, 2);
- testTable.manageSnapshots().createBranch("b1").commit();
- long branchSnapshot = testTable.refs().get("b1").snapshotId();
- assertThat(PartitionStatsHandler.computeAndWriteStatsFile(testTable,
branchSnapshot)).isNull();
- }
-
- @Test
- public void testPartitionStatsOnInvalidSnapshot() throws Exception {
- Table testTable =
- TestTables.create(tempDir("invalid_snapshot"), "invalid_snapshot",
SCHEMA, SPEC, 2);
- assertThatThrownBy(() ->
PartitionStatsHandler.computeAndWriteStatsFile(testTable, 42L))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Snapshot not found: 42");
- }
-
- @Test
- public void testPartitionStatsOnUnPartitionedTable() throws Exception {
- Table testTable =
- TestTables.create(
- tempDir("unpartitioned_table"),
- "unpartitioned_table",
- SCHEMA,
- PartitionSpec.unpartitioned(),
- 2);
-
- List<Record> records = prepareRecords(testTable.schema());
- DataFile dataFile = FileHelpers.writeDataFile(testTable, outputFile(),
records);
- testTable.newAppend().appendFile(dataFile).commit();
-
- assertThatThrownBy(() ->
PartitionStatsHandler.computeAndWriteStatsFile(testTable))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("table must be partitioned");
- }
-
- @Test
- public void testAllDatatypePartitionWriting() throws Exception {
- Schema schema =
- new Schema(
- required(100, "id", Types.LongType.get()),
- optional(101, "data", Types.StringType.get()),
- required(102, "b", Types.BooleanType.get()),
- optional(103, "i", Types.IntegerType.get()),
- required(104, "l", Types.LongType.get()),
- optional(105, "f", Types.FloatType.get()),
- required(106, "d", Types.DoubleType.get()),
- optional(107, "date", Types.DateType.get()),
- required(108, "ts", Types.TimestampType.withoutZone()),
- required(110, "s", Types.StringType.get()),
- required(111, "uuid", Types.UUIDType.get()),
- required(112, "fixed", Types.FixedType.ofLength(7)),
- optional(113, "bytes", Types.BinaryType.get()),
- required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
- required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
- required(116, "dec_38_10", Types.DecimalType.of(38, 10)), //
maximum precision
- required(117, "time", Types.TimeType.get()));
-
- PartitionSpec spec =
- PartitionSpec.builderFor(schema)
- .identity("b")
- .identity("i")
- .identity("l")
- .identity("f")
- .identity("d")
- .identity("date")
- .identity("ts")
- .identity("s")
- .identity("uuid")
- .identity("fixed")
- .identity("bytes")
- .identity("dec_9_0")
- .identity("dec_11_2")
- .identity("dec_38_10")
- .identity("time")
- .build();
-
- Table testTable =
- TestTables.create(
- tempDir("test_all_type"), "test_all_type", schema, spec,
SortOrder.unsorted(), 2);
-
- Types.StructType partitionSchema = Partitioning.partitionType(testTable);
- Schema dataSchema = PartitionStatsHandler.schema(partitionSchema);
-
- PartitionData partitionData =
- new
PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
- partitionData.set(0, true);
- partitionData.set(1, 42);
- partitionData.set(2, 42L);
- partitionData.set(3, 3.14f);
- partitionData.set(4, 3.141592653589793);
- partitionData.set(5,
Literal.of("2022-01-01").to(Types.DateType.get()).value());
- partitionData.set(
- 6,
Literal.of("2017-12-01T10:12:55.038194").to(Types.TimestampType.withoutZone()).value());
- partitionData.set(7, "string");
- partitionData.set(8, UUID.randomUUID());
- partitionData.set(9, ByteBuffer.wrap(new byte[] {0, 1, 2, 3, 4, 5, 6}));
- partitionData.set(10, ByteBuffer.wrap(new byte[] {1, 2, 3}));
- partitionData.set(11, new BigDecimal("123456789"));
- partitionData.set(12, new BigDecimal("1234567.89"));
- partitionData.set(13, new BigDecimal("12345678901234567890.1234567890"));
- partitionData.set(14,
Literal.of("10:10:10").to(Types.TimeType.get()).value());
-
- PartitionStats partitionStats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- partitionStats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong());
- partitionStats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt());
- partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L *
RANDOM.nextInt(20));
- List<PartitionStats> expected = Collections.singletonList(partitionStats);
- PartitionStatisticsFile statisticsFile =
- PartitionStatsHandler.writePartitionStatsFile(testTable, 42L,
dataSchema, expected);
-
- List<PartitionStats> written;
- try (CloseableIterable<PartitionStats> recordIterator =
- PartitionStatsHandler.readPartitionStatsFile(
- dataSchema, Files.localInput(statisticsFile.path()))) {
- written = Lists.newArrayList(recordIterator);
- }
-
- assertThat(written).hasSize(expected.size());
- Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
- for (int i = 0; i < written.size(); i++) {
- assertThat(isEqual(comparator, written.get(i),
expected.get(i))).isTrue();
- }
- }
-
- @Test
- public void testOptionalFieldsWriting() throws Exception {
- PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
- Table testTable =
- TestTables.create(
- tempDir("test_partition_stats_optional"),
- "test_partition_stats_optional",
- SCHEMA,
- spec,
- SortOrder.unsorted(),
- 2);
-
- Types.StructType partitionSchema = Partitioning.partitionType(testTable);
- Schema dataSchema = PartitionStatsHandler.schema(partitionSchema);
-
- ImmutableList.Builder<PartitionStats> partitionListBuilder =
ImmutableList.builder();
- for (int i = 0; i < 5; i++) {
- PartitionData partitionData =
- new
PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType());
- partitionData.set(0, RANDOM.nextInt());
-
- PartitionStats stats = new PartitionStats(partitionData,
RANDOM.nextInt(10));
- stats.set(PARTITION_FIELD_ID, partitionData);
- stats.set(DATA_RECORD_COUNT.fieldId(), RANDOM.nextLong());
- stats.set(DATA_FILE_COUNT.fieldId(), RANDOM.nextInt());
- stats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), 1024L *
RANDOM.nextInt(20));
- stats.set(POSITION_DELETE_RECORD_COUNT.fieldId(), null);
- stats.set(POSITION_DELETE_FILE_COUNT.fieldId(), null);
- stats.set(EQUALITY_DELETE_RECORD_COUNT.fieldId(), null);
- stats.set(EQUALITY_DELETE_FILE_COUNT.fieldId(), null);
- stats.set(TOTAL_RECORD_COUNT.fieldId(), null);
- stats.set(LAST_UPDATED_AT.fieldId(), null);
- stats.set(LAST_UPDATED_SNAPSHOT_ID.fieldId(), null);
-
- partitionListBuilder.add(stats);
- }
-
- List<PartitionStats> expected = partitionListBuilder.build();
-
- assertThat(expected.get(0))
- .extracting(
- PartitionStats::positionDeleteRecordCount,
- PartitionStats::positionDeleteFileCount,
- PartitionStats::equalityDeleteRecordCount,
- PartitionStats::equalityDeleteFileCount,
- PartitionStats::totalRecords,
- PartitionStats::lastUpdatedAt,
- PartitionStats::lastUpdatedSnapshotId)
- .isEqualTo(
- Arrays.asList(
- 0L, 0, 0L, 0, null, null, null)); // null counters must be
initialized to zero.
-
- PartitionStatisticsFile statisticsFile =
- PartitionStatsHandler.writePartitionStatsFile(testTable, 42L,
dataSchema, expected);
-
- List<PartitionStats> written;
- try (CloseableIterable<PartitionStats> recordIterator =
- PartitionStatsHandler.readPartitionStatsFile(
- dataSchema, Files.localInput(statisticsFile.path()))) {
- written = Lists.newArrayList(recordIterator);
- }
-
- assertThat(written).hasSize(expected.size());
- Comparator<StructLike> comparator = Comparators.forType(partitionSchema);
- for (int i = 0; i < written.size(); i++) {
- assertThat(isEqual(comparator, written.get(i),
expected.get(i))).isTrue();
- }
- }
-
- @SuppressWarnings("checkstyle:MethodLength")
- @Test // Tests for all the table formats (PARQUET, ORC, AVRO)
- public void testPartitionStats() throws Exception {
- Assumptions.assumeThat(format)
- .as("ORC internal readers and writers are not supported")
- .isNotEqualTo(FileFormat.ORC);
-
- Table testTable =
- TestTables.create(
- tempDir("partition_stats_" + format.name()),
- "partition_stats_compute_" + format.name(),
- SCHEMA,
- SPEC,
- 2,
- ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT,
format.name()));
-
- List<Record> records = prepareRecords(testTable.schema());
- DataFile dataFile1 =
- FileHelpers.writeDataFile(
- testTable, outputFile(), TestHelpers.Row.of("foo", "A"),
records.subList(0, 3));
- DataFile dataFile2 =
- FileHelpers.writeDataFile(
- testTable, outputFile(), TestHelpers.Row.of("foo", "B"),
records.subList(3, 4));
- DataFile dataFile3 =
- FileHelpers.writeDataFile(
- testTable, outputFile(), TestHelpers.Row.of("bar", "A"),
records.subList(4, 5));
- DataFile dataFile4 =
- FileHelpers.writeDataFile(
- testTable, outputFile(), TestHelpers.Row.of("bar", "B"),
records.subList(5, 7));
-
- for (int i = 0; i < 3; i++) {
- // insert same set of seven records thrice to have a new manifest files
- testTable
- .newAppend()
- .appendFile(dataFile1)
- .appendFile(dataFile2)
- .appendFile(dataFile3)
- .appendFile(dataFile4)
- .commit();
- }
-
- Snapshot snapshot1 = testTable.currentSnapshot();
- Schema recordSchema =
PartitionStatsHandler.schema(Partitioning.partitionType(testTable));
- Types.StructType partitionType =
- recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
- computeAndValidatePartitionStats(
- testTable,
- recordSchema,
- Tuple.tuple(
- partitionRecord(partitionType, "foo", "A"),
- 0,
- 9L,
- 3,
- 3 * dataFile1.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "foo", "B"),
- 0,
- 3L,
- 3,
- 3 * dataFile2.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "bar", "A"),
- 0,
- 3L,
- 3,
- 3 * dataFile3.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "bar", "B"),
- 0,
- 6L,
- 3,
- 3 * dataFile4.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()));
-
- DeleteFile posDeletes = commitPositionDeletes(testTable, dataFile1);
- Snapshot snapshot2 = testTable.currentSnapshot();
-
- DeleteFile eqDeletes = commitEqualityDeletes(testTable);
- Snapshot snapshot3 = testTable.currentSnapshot();
-
- recordSchema =
PartitionStatsHandler.schema(Partitioning.partitionType(testTable));
- partitionType =
recordSchema.findField(PARTITION_FIELD_ID).type().asStructType();
- computeAndValidatePartitionStats(
- testTable,
- recordSchema,
- Tuple.tuple(
- partitionRecord(partitionType, "foo", "A"),
- 0,
- 9L,
- 3,
- 3 * dataFile1.fileSizeInBytes(),
- 0L,
- 0,
- eqDeletes.recordCount(),
- 1,
- null,
- snapshot3.timestampMillis(),
- snapshot3.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "foo", "B"),
- 0,
- 3L,
- 3,
- 3 * dataFile2.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "bar", "A"),
- 0,
- 3L,
- 3,
- 3 * dataFile3.fileSizeInBytes(),
- posDeletes.recordCount(),
- 1,
- 0L,
- 0,
- null,
- snapshot2.timestampMillis(),
- snapshot2.snapshotId()),
- Tuple.tuple(
- partitionRecord(partitionType, "bar", "B"),
- 0,
- 6L,
- 3,
- 3 * dataFile4.fileSizeInBytes(),
- 0L,
- 0,
- 0L,
- 0,
- null,
- snapshot1.timestampMillis(),
- snapshot1.snapshotId()));
- }
-
- private OutputFile outputFile() throws IOException {
- return Files.localOutput(File.createTempFile("data", null,
tempDir("stats")));
- }
-
- private static StructLike partitionRecord(
- Types.StructType partitionType, String val1, String val2) {
- GenericRecord record = GenericRecord.create(partitionType);
- record.set(0, val1);
- record.set(1, val2);
- return record;
- }
-
- private static List<Record> prepareRecords(Schema schema) {
- GenericRecord record = GenericRecord.create(schema);
- List<Record> records = Lists.newArrayList();
- // foo 4 records, bar 3 records
- // foo, A -> 3 records
- records.add(record.copy("c1", 0, "c2", "foo", "c3", "A"));
- records.add(record.copy("c1", 1, "c2", "foo", "c3", "A"));
- records.add(record.copy("c1", 2, "c2", "foo", "c3", "A"));
- // foo, B -> 1 record
- records.add(record.copy("c1", 3, "c2", "foo", "c3", "B"));
- // bar, A -> 1 record
- records.add(record.copy("c1", 4, "c2", "bar", "c3", "A"));
- // bar, B -> 2 records
- records.add(record.copy("c1", 5, "c2", "bar", "c3", "B"));
- records.add(record.copy("c1", 6, "c2", "bar", "c3", "B"));
- return records;
- }
-
- private static void computeAndValidatePartitionStats(
- Table testTable, Schema recordSchema, Tuple... expectedValues) throws
IOException {
- // compute and commit partition stats file
- Snapshot currentSnapshot = testTable.currentSnapshot();
- PartitionStatisticsFile result =
PartitionStatsHandler.computeAndWriteStatsFile(testTable);
-
testTable.updatePartitionStatistics().setPartitionStatistics(result).commit();
- assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId());
-
- // read the partition entries from the stats file
- List<PartitionStats> partitionStats;
- try (CloseableIterable<PartitionStats> recordIterator =
- PartitionStatsHandler.readPartitionStatsFile(
- recordSchema, Files.localInput(result.path()))) {
- partitionStats = Lists.newArrayList(recordIterator);
- }
-
- assertThat(partitionStats)
- .extracting(
- PartitionStats::partition,
- PartitionStats::specId,
- PartitionStats::dataRecordCount,
- PartitionStats::dataFileCount,
- PartitionStats::totalDataFileSizeInBytes,
- PartitionStats::positionDeleteRecordCount,
- PartitionStats::positionDeleteFileCount,
- PartitionStats::equalityDeleteRecordCount,
- PartitionStats::equalityDeleteFileCount,
- PartitionStats::totalRecords,
- PartitionStats::lastUpdatedAt,
- PartitionStats::lastUpdatedSnapshotId)
- .containsExactlyInAnyOrder(expectedValues);
- }
-
- private DeleteFile commitEqualityDeletes(Table testTable) throws IOException
{
- Schema deleteRowSchema = testTable.schema().select("c1");
- Record dataDelete = GenericRecord.create(deleteRowSchema);
- List<Record> dataDeletes =
- Lists.newArrayList(dataDelete.copy("c1", 1), dataDelete.copy("c1", 2));
-
- DeleteFile eqDeletes =
- FileHelpers.writeDeleteFile(
- testTable,
- Files.localOutput(File.createTempFile("junit", null,
tempDir("eq_delete"))),
- TestHelpers.Row.of("foo", "A"),
- dataDeletes,
- deleteRowSchema);
- testTable.newRowDelta().addDeletes(eqDeletes).commit();
- return eqDeletes;
- }
-
- private DeleteFile commitPositionDeletes(Table testTable, DataFile
dataFile1) throws IOException {
- List<PositionDelete<?>> deletes = Lists.newArrayList();
- for (long i = 0; i < 2; i++) {
- deletes.add(
- positionDelete(testTable.schema(), dataFile1.location(), i, (int) i,
String.valueOf(i)));
- }
-
- DeleteFile posDeletes =
- FileHelpers.writePosDeleteFile(
- testTable,
- Files.localOutput(File.createTempFile("junit", null,
tempDir("pos_delete"))),
- TestHelpers.Row.of("bar", "A"),
- deletes);
- testTable.newRowDelta().addDeletes(posDeletes).commit();
- return posDeletes;
- }
-
- private static PositionDelete<GenericRecord> positionDelete(
- Schema tableSchema, CharSequence path, Long position, Object... values) {
- PositionDelete<GenericRecord> posDelete = PositionDelete.create();
- GenericRecord nested = GenericRecord.create(tableSchema);
- for (int i = 0; i < values.length; i++) {
- nested.set(i, values[i]);
- }
-
- posDelete.set(path, position, nested);
- return posDelete;
- }
-
- private File tempDir(String folderName) throws IOException {
- return java.nio.file.Files.createTempDirectory(temp.toAbsolutePath(),
folderName).toFile();
- }
-
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
- private static boolean isEqual(
- Comparator<StructLike> partitionComparator, PartitionStats stats1,
PartitionStats stats2) {
- if (stats1 == stats2) {
- return true;
- } else if (stats1 == null || stats2 == null) {
- return false;
- }
-
- return partitionComparator.compare(stats1.partition(), stats2.partition())
== 0 &&
- stats1.specId() == stats2.specId() &&
- stats1.dataRecordCount() == stats2.dataRecordCount() &&
- stats1.dataFileCount() == stats2.dataFileCount() &&
- stats1.totalDataFileSizeInBytes() ==
stats2.totalDataFileSizeInBytes() &&
- stats1.positionDeleteRecordCount() ==
stats2.positionDeleteRecordCount() &&
- stats1.positionDeleteFileCount() ==
stats2.positionDeleteFileCount() &&
- stats1.equalityDeleteRecordCount() ==
stats2.equalityDeleteRecordCount() &&
- stats1.equalityDeleteFileCount() ==
stats2.equalityDeleteFileCount() &&
- Objects.equals(stats1.totalRecords(), stats2.totalRecords()) &&
- Objects.equals(stats1.lastUpdatedAt(), stats2.lastUpdatedAt()) &&
- Objects.equals(stats1.lastUpdatedSnapshotId(),
stats2.lastUpdatedSnapshotId());
- }
-}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java
index 4f6fb9699f9..6ba9a04d1dd 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergRollback.java
@@ -22,9 +22,9 @@
import java.io.IOException;
import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.Assert;
import org.junit.Assume;
@@ -99,7 +99,7 @@ public void testRevertRollback() throws IOException,
InterruptedException {
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
- String metadataLocationBeforeRollback = ((BaseTable)
table).operations().current().metadataFileLocation();
+ String metadataLocationBeforeRollback =
TableUtil.metadataFileLocation(table);
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
ROLLBACK(" +
table.history().get(0).snapshotId() + ")");
Assert.assertEquals(3, shell.executeStatement("SELECT * FROM " +
identifier.name()).size());
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 5e66f795a7e..b6d39bd01a2 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -59,6 +59,7 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
@@ -1263,7 +1264,7 @@ public void
testCreateTableWithFormatV2ThroughTableProperty() {
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals("should create table using format v2",
- 2, ((BaseTable) icebergTable).operations().current().formatVersion());
+ 2, TableUtil.formatVersion(icebergTable));
}
@Test
@@ -1523,7 +1524,7 @@ public void testAuthzURI(boolean masked) throws
TException, InterruptedException
URI uriForAuth = storageHandler.getURIForAuth(hmsTable);
String metadataLocation =
- storageHandler.getPathForAuth(((BaseTable)
table).operations().current().metadataFileLocation(),
+ storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table),
hmsTable.getSd().getLocation());
if (masked) {
@@ -1563,7 +1564,7 @@ public void
testAuthzURIWithAuthEnabledWithMetadataLocation(boolean masked) thro
PartitionSpec.unpartitioned(), FileFormat.PARQUET, ImmutableList.of());
String metadataFileLocation =
- URI.create(((BaseTable)
sourceTable).operations().current().metadataFileLocation()).getPath();
+ URI.create(TableUtil.metadataFileLocation(sourceTable)).getPath();
TableIdentifier target = TableIdentifier.of("default", "target");
Table targetTable = testTables.createTable(shell, target.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
@@ -1618,7 +1619,7 @@ public void
testAuthzURIWithAuthEnabledAndMockCommandAuthorizer(boolean masked)
HiveIcebergStorageHandler storageHandler = new
HiveIcebergStorageHandler();
storageHandler.setConf(shell.getHiveConf());
String metadataLocation =
HiveConf.EncoderDecoderFactory.URL_ENCODER_DECODER.decode(
- storageHandler.getPathForAuth(((BaseTable)
table).operations().current().metadataFileLocation(),
+ storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table),
hmsTable.getSd().getLocation()));
if (masked) {
@@ -1657,7 +1658,7 @@ public void testAuthzURIWithAuthEnabled(boolean masked)
throws TException, Inter
storageHandler.setConf(shell.getHiveConf());
URI uriForAuth = storageHandler.getURIForAuth(hmsTable);
String metadataLocation =
- storageHandler.getPathForAuth(((BaseTable)
table).operations().current().metadataFileLocation(),
+ storageHandler.getPathForAuth(TableUtil.metadataFileLocation(table),
hmsTable.getSd().getLocation());
if (masked) {
@@ -1679,7 +1680,7 @@ public void testCreateTableWithMetadataLocation() throws
IOException {
ImmutableMap.<String,
String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
testTables.appendIcebergTable(shell.getHiveConf(), sourceTable,
FileFormat.PARQUET, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
- String metadataLocation = ((BaseTable)
sourceTable).operations().current().metadataFileLocation();
+ String metadataLocation = TableUtil.metadataFileLocation(sourceTable);
shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");
Table targetTable =
@@ -1687,7 +1688,7 @@ public void testCreateTableWithMetadataLocation() throws
IOException {
PartitionSpec.unpartitioned(), FileFormat.PARQUET,
Collections.emptyList(), 1,
ImmutableMap.<String, String>builder().put("metadata_location",
metadataLocation).build()
);
- Assert.assertEquals(metadataLocation, ((BaseTable)
targetTable).operations().current().metadataFileLocation());
+ Assert.assertEquals(metadataLocation,
TableUtil.metadataFileLocation(targetTable));
List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
targetIdentifier.name());
List<Record> records =
HiveIcebergTestUtils.valueForRow(targetTable.schema(), rows);
HiveIcebergTestUtils.validateData(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS,
records, 0);
@@ -1712,11 +1713,11 @@ public void testAlterTableWithMetadataLocation() throws
IOException {
PartitionSpec.unpartitioned(), FileFormat.PARQUET,
Collections.emptyList(), 1, Collections.emptyMap());
testTables.appendIcebergTable(shell.getHiveConf(), table,
FileFormat.PARQUET, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
- String firstMetadataLocation = ((BaseTable)
table).operations().current().metadataFileLocation();
+ String firstMetadataLocation = TableUtil.metadataFileLocation(table);
testTables.appendIcebergTable(shell.getHiveConf(), table,
FileFormat.PARQUET, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
table.refresh();
- String secondMetadataLocation = ((BaseTable)
table).operations().current().metadataFileLocation();
+ String secondMetadataLocation = TableUtil.metadataFileLocation(table);
Assert.assertNotEquals(firstMetadataLocation, secondMetadataLocation);
shell.executeStatement("ALTER TABLE " + tableIdentifier.name() + " SET
TBLPROPERTIES('metadata_location'='" +
firstMetadataLocation + "')");
@@ -1743,7 +1744,7 @@ public void
testAlterTableWithMetadataLocationFromAnotherTable() throws IOExcept
ImmutableMap.<String,
String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
testTables.appendIcebergTable(shell.getHiveConf(), sourceTable,
FileFormat.PARQUET, null,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
- String metadataLocation = ((BaseTable)
sourceTable).operations().current().metadataFileLocation();
+ String metadataLocation = TableUtil.metadataFileLocation(sourceTable);
shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");
testTables.createTable(shell, targetIdentifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
@@ -1912,7 +1913,7 @@ public void
testConcurrentIcebergCommitsAndHiveAlterTableCalls() throws Exceptio
executorService.awaitTermination(1, TimeUnit.MINUTES);
// Verify that the insert was effective
- Assert.assertEquals(((BaseTable)
testTables.loadTable(identifier)).operations().current().metadataFileLocation(),
+
Assert.assertEquals(TableUtil.metadataFileLocation(testTables.loadTable(identifier)),
(long) HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(),
shell.executeStatement("select count(*) from customers").get(0)[0]
);
@@ -1937,7 +1938,7 @@ public void
testCreateTableWithMetadataLocationWithoutSchema() throws IOExceptio
testTables.createTable(shell, sourceIdentifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec,
FileFormat.PARQUET, records, 1,
ImmutableMap.<String,
String>builder().put(InputFormatConfig.EXTERNAL_TABLE_PURGE, "FALSE").build());
- String metadataLocation = ((BaseTable)
sourceTable).operations().current().metadataFileLocation();
+ String metadataLocation = TableUtil.metadataFileLocation(sourceTable);
shell.executeStatement("DROP TABLE " + sourceIdentifier.name());
TableIdentifier targetIdentifier = TableIdentifier.of("default", "target");
diff --git a/itests/hive-iceberg/pom.xml b/itests/hive-iceberg/pom.xml
index e9812a6b270..70bdb9bc95a 100644
--- a/itests/hive-iceberg/pom.xml
+++ b/itests/hive-iceberg/pom.xml
@@ -27,7 +27,7 @@
<hive.path.to.root>../..</hive.path.to.root>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<log4j2.debug>false</log4j2.debug>
- <iceberg.version>1.9.1</iceberg.version>
+ <iceberg.version>1.10.0</iceberg.version>
</properties>
<dependencies>
<dependency>
diff --git a/pom.xml b/pom.xml
index 9dee9a882ee..0d9fb17e16f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,7 +202,7 @@
<protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
<protoc.path>${env.PROTOC_PATH}</protoc.path>
<rat.version>0.16.1</rat.version>
- <roaringbit.version>1.2.1</roaringbit.version>
+ <roaringbit.version>1.3.0</roaringbit.version>
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.30</slf4j.version>
<ST4.version>4.0.4</ST4.version>