This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 742ea4f5152 HIVE-29205: Iceberg: Upgrade iceberg version to 1.10.0
(#6227)
742ea4f5152 is described below
commit 742ea4f5152f1fa910c015c03ba904da1bb4a9e6
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Dec 9 14:58:48 2025 +0100
HIVE-29205: Iceberg: Upgrade iceberg version to 1.10.0 (#6227)
---
.../iceberg/hive/HMSTablePropertyHelper.java | 104 ++--
.../java/org/apache/iceberg/hive/HiveCatalog.java | 2 +-
.../apache/iceberg/hive/HiveOperationsBase.java | 121 +++--
.../apache/iceberg/hive/HiveTableOperations.java | 70 ++-
.../apache/iceberg/hive/HiveViewOperations.java | 175 +++----
.../org/apache/iceberg/hive/HiveTableTest.java | 2 +-
...veTableBaseTest.java => HiveTableTestBase.java} | 2 +-
.../org/apache/iceberg/hive/TestHiveCatalog.java | 2 +-
.../org/apache/iceberg/hive/TestHiveCommits.java | 2 +-
...leTest.java => TestHiveCreateReplaceTable.java} | 2 +-
.../iceberg/hive/TestHiveTableConcurrency.java | 2 +-
.../describe_iceberg_metadata_tables.q.out | 36 +-
iceberg/patched-iceberg-api/pom.xml | 6 -
.../org/apache/iceberg/util/StructProjection.java | 225 ---------
iceberg/patched-iceberg-core/pom.xml | 4 -
.../org/apache/iceberg/ManifestFilterManager.java | 546 ---------------------
.../java/org/apache/iceberg/PartitionsTable.java | 333 -------------
.../org/apache/iceberg/util/StructLikeMap.java | 187 -------
.../org/apache/iceberg/util/StructLikeWrapper.java | 103 ----
iceberg/pom.xml | 2 +-
pom.xml | 8 +-
.../metastore-rest-catalog/pom.xml | 2 +-
22 files changed, 291 insertions(+), 1645 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
index 45253e1dd8d..f7de4dd0719 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
@@ -45,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.view.ViewMetadata;
import org.apache.parquet.Strings;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.slf4j.Logger;
@@ -54,34 +55,34 @@
public class HMSTablePropertyHelper {
private static final Logger LOG =
LoggerFactory.getLogger(HMSTablePropertyHelper.class);
- public static final String HIVE_ICEBERG_STORAGE_HANDLER =
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
+ public static final String HIVE_ICEBERG_STORAGE_HANDLER =
+ "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
public static final String PARTITION_SPEC =
"iceberg.mr.table.partition.spec";
+ /**
+ * Provides key translation where necessary between Iceberg and HMS props.
This translation is
+ * needed because some properties control the same behaviour but are named
differently in Iceberg
+ * and Hive. Therefore changes to these property pairs should be
synchronized.
+ *
+ * <p>Example: Deleting data files upon DROP TABLE is enabled using
gc.enabled=true in Iceberg and
+ * external.table.purge=true in Hive. Hive and Iceberg users are unaware of
each other's control
+ * flags, therefore inconsistent behaviour can occur from e.g. a Hive user's
point of view if
+ * external.table.purge=true is set on the HMS table but gc.enabled=false is
set on the Iceberg
+ * table, resulting in no data file deletion.
+ */
private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
ImmutableBiMap.of(
- // gc.enabled in Iceberg and external.table.purge in Hive are meant to
do the same things
- // but with different names
- GC_ENABLED, "external.table.purge", TableProperties.PARQUET_COMPRESSION,
ParquetOutputFormat.COMPRESSION,
- TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
ParquetOutputFormat.BLOCK_SIZE);
+ GC_ENABLED, "external.table.purge",
+ TableProperties.PARQUET_COMPRESSION, ParquetOutputFormat.COMPRESSION,
+ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
ParquetOutputFormat.BLOCK_SIZE
+ );
private HMSTablePropertyHelper() {
}
- /**
- * Provides key translation where necessary between Iceberg and HMS props.
This translation is needed because some
- * properties control the same behaviour but are named differently in
Iceberg and Hive. Therefore, changes to these
- * property pairs should be synchronized.
- *
- * Example: Deleting data files upon DROP TABLE is enabled using
gc.enabled=true in Iceberg and
- * external.table.purge=true in Hive. Hive and Iceberg users are unaware of
each other's control flags, therefore
- * inconsistent behaviour can occur from e.g. a Hive user's point of view if
external.table.purge=true is set on the
- * HMS table but gc.enabled=false is set on the Iceberg table, resulting in
no data file deletion.
- *
- * @param hmsProp The HMS property that should be translated to Iceberg
property
- * @return Iceberg property equivalent to the hmsProp. If no such
translation exists, the original hmsProp is returned
- */
- public static String translateToIcebergProp(String hmsProp) {
+ static String translateToIcebergProp(String hmsProp) {
return ICEBERG_TO_HMS_TRANSLATION.inverse().getOrDefault(hmsProp, hmsProp);
}
+
/** Updates the HMS Table properties based on the Iceberg Table metadata. */
public static void updateHmsTableForIcebergTable(
String newMetadataLocation,
@@ -136,6 +137,33 @@ public static void updateHmsTableForIcebergTable(
tbl.setParameters(parameters);
}
+ /** Updates the HMS Table properties based on the Iceberg View metadata. */
+ public static void updateHmsTableForIcebergView(
+ String newMetadataLocation,
+ Table tbl,
+ ViewMetadata metadata,
+ Set<String> obsoleteProps,
+ long maxHiveTablePropertySize,
+ String currentLocation) {
+ Map<String, String> parameters =
+ Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);
+
+ // push all Iceberg view properties into HMS
+ metadata.properties().entrySet().stream()
+ .filter(entry ->
!entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
+ .forEach(entry -> parameters.put(entry.getKey(), entry.getValue()));
+ setCommonParameters(
+ newMetadataLocation,
+ metadata.uuid(),
+ obsoleteProps,
+ currentLocation,
+ parameters,
+ HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+ metadata.schema(),
+ maxHiveTablePropertySize);
+ tbl.setParameters(parameters);
+ }
+
public static SortOrder getSortOrder(Properties props, Schema schema) {
String sortOrderJsonString =
props.getProperty(TableProperties.DEFAULT_SORT_ORDER);
return Strings.isNullOrEmpty(sortOrderJsonString) ? SortOrder.unsorted() :
SortOrderParser.fromJson(schema,
@@ -168,8 +196,7 @@ private static void setCommonParameters(
setSchema(schema, parameters, maxHiveTablePropertySize);
}
- @VisibleForTesting
- static void setStorageHandler(Map<String, String> parameters, boolean
hiveEngineEnabled) {
+ private static void setStorageHandler(Map<String, String> parameters,
boolean hiveEngineEnabled) {
// If needed, set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE,
HIVE_ICEBERG_STORAGE_HANDLER);
@@ -179,7 +206,8 @@ static void setStorageHandler(Map<String, String>
parameters, boolean hiveEngine
}
@VisibleForTesting
- static void setSnapshotStats(TableMetadata metadata, Map<String, String>
parameters, long maxHiveTablePropertySize) {
+ static void setSnapshotStats(
+ TableMetadata metadata, Map<String, String> parameters, long
maxHiveTablePropertySize) {
parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);
@@ -188,17 +216,15 @@ static void setSnapshotStats(TableMetadata metadata,
Map<String, String> paramet
if (exposeInHmsProperties(maxHiveTablePropertySize) && currentSnapshot !=
null) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_ID,
String.valueOf(currentSnapshot.snapshotId()));
parameters.put(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP,
String.valueOf(currentSnapshot.timestampMillis()));
+
setSnapshotSummary(parameters, currentSnapshot,
maxHiveTablePropertySize);
}
-
parameters.put(TableProperties.SNAPSHOT_COUNT,
String.valueOf(metadata.snapshots().size()));
}
@VisibleForTesting
static void setSnapshotSummary(
- Map<String, String> parameters,
- Snapshot currentSnapshot,
- long maxHiveTablePropertySize) {
+ Map<String, String> parameters, Snapshot currentSnapshot, long
maxHiveTablePropertySize) {
try {
String summary =
JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
if (summary.length() <= maxHiveTablePropertySize) {
@@ -208,14 +234,17 @@ static void setSnapshotSummary(
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}
} catch (JsonProcessingException e) {
- LOG.warn("Failed to convert current snapshot({}) summary to a json
string", currentSnapshot.snapshotId(), e);
+ LOG.warn("Failed to convert current snapshot({}) summary to a json
string",
+ currentSnapshot.snapshotId(), e);
}
}
@VisibleForTesting
- static void setPartitionSpec(TableMetadata metadata, Map<String, String>
parameters, long maxHiveTablePropertySize) {
+ static void setPartitionSpec(
+ TableMetadata metadata, Map<String, String> parameters, long
maxHiveTablePropertySize) {
parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
- if (exposeInHmsProperties(maxHiveTablePropertySize) && metadata.spec() !=
null && metadata.spec().isPartitioned()) {
+ if (exposeInHmsProperties(maxHiveTablePropertySize) &&
+ metadata.spec() != null && metadata.spec().isPartitioned()) {
String spec = PartitionSpecParser.toJson(metadata.spec());
setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec,
maxHiveTablePropertySize);
}
@@ -232,17 +261,19 @@ public static PartitionSpec getPartitionSpec(Map<String,
String> props, Schema s
}
@VisibleForTesting
- static void setSortOrder(TableMetadata metadata, Map<String, String>
parameters, long maxHiveTablePropertySize) {
+ static void setSortOrder(
+ TableMetadata metadata, Map<String, String> parameters, long
maxHiveTablePropertySize) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (exposeInHmsProperties(maxHiveTablePropertySize) &&
- metadata.sortOrder() != null &&
- metadata.sortOrder().isSorted()) {
+ metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder,
maxHiveTablePropertySize);
}
}
- public static void setSchema(Schema schema, Map<String, String> parameters,
long maxHiveTablePropertySize) {
+ @VisibleForTesting
+ static void setSchema(
+ Schema schema, Map<String, String> parameters, long
maxHiveTablePropertySize) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (exposeInHmsProperties(maxHiveTablePropertySize) && schema != null) {
String jsonSchema = SchemaParser.toJson(schema);
@@ -251,13 +282,12 @@ public static void setSchema(Schema schema, Map<String,
String> parameters, long
}
private static void setField(
- Map<String, String> parameters,
- String key, String value,
- long maxHiveTablePropertySize) {
+ Map<String, String> parameters, String key, String value, long
maxHiveTablePropertySize) {
if (value.length() <= maxHiveTablePropertySize) {
parameters.put(key, value);
} else {
- LOG.warn("Not exposing {} in HMS since it exceeds {} characters", key,
maxHiveTablePropertySize);
+ LOG.warn("Not exposing {} in HMS since it exceeds {} characters",
+ key, maxHiveTablePropertySize);
}
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 72280449ad5..d7193d4510a 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -494,7 +494,7 @@ public void createNamespace(Namespace namespace,
Map<String, String> meta) {
@Override
public List<Namespace> listNamespaces(Namespace namespace) {
- if (!isValidateNamespace(namespace) && !namespace.isEmpty()) {
+ if (!namespace.isEmpty() && (!isValidateNamespace(namespace) ||
!namespaceExists(namespace))) {
throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
}
if (!namespace.isEmpty()) {
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
index d69652d80b2..937939b4816 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
@@ -33,6 +33,8 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchIcebergViewException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -42,12 +44,11 @@
import org.slf4j.LoggerFactory;
/** All the HMS operations like table,view,materialized_view should implement
this. */
-public interface HiveOperationsBase {
+interface HiveOperationsBase {
Logger LOG = LoggerFactory.getLogger(HiveOperationsBase.class);
- String HIVE_ICEBERG_STORAGE_HANDLER =
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
-
- // The max size is based on HMS backend database. For Hive versions below
2.3, the max table parameter size is 4000
+ // The max size is based on HMS backend database. For Hive versions below
2.3, the max table
+ // parameter size is 4000
// characters, see https://issues.apache.org/jira/browse/HIVE-12274
// set to 0 to not expose Iceberg metadata in HMS Table properties.
String HIVE_TABLE_PROPERTY_MAX_SIZE = "iceberg.hive.table-property-max-size";
@@ -56,8 +57,6 @@ public interface HiveOperationsBase {
String NO_LOCK_EXPECTED_VALUE = "expected_parameter_value";
String ICEBERG_VIEW_TYPE_VALUE = "iceberg-view";
- TableType tableType();
-
enum ContentType {
TABLE("Table"),
VIEW("View");
@@ -73,6 +72,8 @@ public String value() {
}
}
+ TableType tableType();
+
ClientPool<IMetaStoreClient, TException> metaClients();
long maxHiveTablePropertySize();
@@ -92,51 +93,74 @@ default Table loadHmsTable() throws TException,
InterruptedException {
default Map<String, String> hmsEnvContext(String metadataLocation) {
return metadataLocation == null ? ImmutableMap.of() :
- ImmutableMap.of(
- NO_LOCK_EXPECTED_KEY,
- BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
- NO_LOCK_EXPECTED_VALUE,
- metadataLocation);
+ ImmutableMap.of(
+ NO_LOCK_EXPECTED_KEY,
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
+ NO_LOCK_EXPECTED_VALUE,
+ metadataLocation);
+ }
+
+ private static boolean isValidIcebergView(Table table) {
+ String tableType =
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+ return
TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) &&
+ ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableType);
+ }
+
+ private static boolean isValidIcebergTable(Table table) {
+ String tableType =
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+ return
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableType);
}
static void validateTableIsIceberg(Table table, String fullName) {
String tableType =
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
- tableType != null &&
tableType.equalsIgnoreCase(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE),
- "Not an iceberg table: %s (type=%s)", fullName, tableType);
+ isValidIcebergTable(table), "Not an iceberg table: %s (type=%s)",
fullName, tableType);
}
static void validateTableIsIcebergView(Table table, String fullName) {
String tableTypeProp =
table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergViewException.check(
-
TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) &&
- ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
- "Not an iceberg view: %s (type=%s) (tableType=%s)",
- fullName,
- tableTypeProp,
- table.getTableType());
+ isValidIcebergView(table),
+ "Not an iceberg view: %s (type=%s) (tableType=%s)",
+ fullName,
+ tableTypeProp,
+ table.getTableType());
+ }
+
+ static void validateIcebergTableNotLoadedAsIcebergView(Table table, String
fullName) {
+ if (!isValidIcebergView(table) && isValidIcebergTable(table)) {
+ throw new NoSuchViewException("View does not exist: %s", fullName);
+ }
+ }
+
+ static void validateIcebergViewNotLoadedAsIcebergTable(Table table, String
fullName) {
+ if (!isValidIcebergTable(table) && isValidIcebergView(table)) {
+ throw new NoSuchTableException("Table does not exist: %s", fullName);
+ }
}
default void persistTable(Table hmsTable, boolean updateHiveTable, String
metadataLocation)
throws TException, InterruptedException {
if (updateHiveTable) {
- metaClients().run(
- client -> {
- MetastoreUtil.alterTable(
- client, database(), table(), hmsTable,
hmsEnvContext(metadataLocation));
- return null;
- });
+ metaClients()
+ .run(
+ client -> {
+ MetastoreUtil.alterTable(
+ client, database(), table(), hmsTable,
hmsEnvContext(metadataLocation));
+ return null;
+ });
} else {
- metaClients().run(
- client -> {
- client.createTable(hmsTable);
- return null;
- });
+ metaClients()
+ .run(
+ client -> {
+ client.createTable(hmsTable);
+ return null;
+ });
}
}
static StorageDescriptor storageDescriptor(
- Schema schema, String location, boolean hiveEngineEnabled) {
+ Schema schema, String location, boolean hiveEngineEnabled) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(HiveSchemaUtil.convert(schema));
storageDescriptor.setLocation(location);
@@ -169,10 +193,10 @@ static void cleanupMetadata(FileIO io, String
commitStatus, String metadataLocat
}
static void cleanupMetadataAndUnlock(
- FileIO io,
- BaseMetastoreOperations.CommitStatus commitStatus,
- String metadataLocation,
- HiveLock lock) {
+ FileIO io,
+ BaseMetastoreOperations.CommitStatus commitStatus,
+ String metadataLocation,
+ HiveLock lock) {
try {
cleanupMetadata(io, commitStatus.name(), metadataLocation);
} finally {
@@ -184,24 +208,27 @@ default Table newHmsTable(String hmsTableOwner) {
Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be
null");
final long currentTimeMillis = System.currentTimeMillis();
- Table newTable = new Table(table(),
- database(),
- hmsTableOwner,
- (int) currentTimeMillis / 1000,
- (int) currentTimeMillis / 1000,
- Integer.MAX_VALUE,
- null,
- Collections.emptyList(),
- Maps.newHashMap(),
- null,
- null,
- tableType().name());
+ Table newTable =
+ new Table(
+ table(),
+ database(),
+ hmsTableOwner,
+ (int) currentTimeMillis / 1000,
+ (int) currentTimeMillis / 1000,
+ Integer.MAX_VALUE,
+ null,
+ Collections.emptyList(),
+ Maps.newHashMap(),
+ null,
+ null,
+ tableType().name());
if (tableType().equals(TableType.EXTERNAL_TABLE)) {
newTable
.getParameters()
.put("EXTERNAL", "TRUE"); // using the external table type also
requires this
}
+
return newTable;
}
}
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 2eab6f28065..a0f0e779d9b 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -72,8 +72,12 @@ public class HiveTableOperations extends
BaseMetastoreTableOperations
private final ClientPool<IMetaStoreClient, TException> metaClients;
protected HiveTableOperations(
- Configuration conf, ClientPool<IMetaStoreClient, TException>
metaClients, FileIO fileIO,
- String catalogName, String database, String table) {
+ Configuration conf,
+ ClientPool<IMetaStoreClient, TException> metaClients,
+ FileIO fileIO,
+ String catalogName,
+ String database,
+ String table) {
this.conf = conf;
this.metaClients = metaClients;
this.fileIO = fileIO;
@@ -104,6 +108,10 @@ protected void doRefresh() {
String metadataLocation = null;
try {
Table table = metaClients.run(client -> client.getTable(database,
tableName));
+
+ // Check if we are trying to load an Iceberg View as a Table
+ HiveOperationsBase.validateIcebergViewNotLoadedAsIcebergTable(table,
fullName);
+ // Check if it is a valid Iceberg Table
HiveOperationsBase.validateTableIsIceberg(table, fullName);
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
@@ -135,7 +143,7 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS,
false);
BaseMetastoreOperations.CommitStatus commitStatus =
- BaseMetastoreOperations.CommitStatus.FAILURE;
+ BaseMetastoreOperations.CommitStatus.FAILURE;
boolean updateHiveTable = false;
HiveLock lock = lockObject(base);
@@ -149,7 +157,7 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
if (newTable &&
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) !=
null) {
if
(TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tbl.getTableType())) {
throw new AlreadyExistsException(
- "View with same name already exists: %s.%s", database,
tableName);
+ "View with same name already exists: %s.%s", database,
tableName);
}
throw new AlreadyExistsException("Table already exists: %s.%s",
database, tableName);
}
@@ -157,11 +165,14 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
updateHiveTable = true;
LOG.debug("Committing existing table: {}", fullName);
} else {
- tbl = newHmsTable(metadata.property(HiveCatalog.HMS_TABLE_OWNER,
HiveHadoopUtil.currentUser()));
+ tbl = newHmsTable(
+ metadata.property(HiveCatalog.HMS_TABLE_OWNER,
HiveHadoopUtil.currentUser())
+ );
LOG.debug("Committing new table: {}", fullName);
}
- tbl.setSd(HiveOperationsBase.storageDescriptor(
+ tbl.setSd(
+ HiveOperationsBase.storageDescriptor(
metadata.schema(),
metadata.location(),
hiveEngineEnabled)); // set to pickup any schema changes
@@ -183,6 +194,7 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
.filter(key -> !metadata.properties().containsKey(key))
.collect(Collectors.toSet());
}
+
HMSTablePropertyHelper.updateHmsTableForIcebergTable(
newMetadataLocation,
tbl,
@@ -198,6 +210,7 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
}
lock.ensureActive();
+
try {
persistTable(
tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null :
baseMetadataLocation);
@@ -208,9 +221,9 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
"Failed to heartbeat for hive lock while " +
- "committing changes. This can lead to a concurrent commit
attempt be able to overwrite this commit. " +
- "Please check the commit history. If you are running into this
issue, try reducing " +
- "iceberg.hive.lock-heartbeat-interval-ms.",
+ "committing changes. This can lead to a concurrent commit
attempt be able to overwrite this commit. " +
+ "Please check the commit history. If you are running into this
issue, try reducing " +
+ "iceberg.hive.lock-heartbeat-interval-ms.",
le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "Table already exists: %s.%s",
database, tableName);
@@ -225,16 +238,16 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
if (e.getMessage() != null && e.getMessage().contains("Table/View
'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
"Failed to acquire locks from metastore because the underlying
metastore " +
- "table 'HIVE_LOCKS' does not exist. This can occur when using
an embedded metastore which does not " +
- "support transactions. To fix this use an alternative
metastore.",
+ "table 'HIVE_LOCKS' does not exist. This can occur when
using an embedded metastore which does not " +
+ "support transactions. To fix this use an alternative
metastore.",
e);
}
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
if (e.getMessage() != null && e.getMessage().contains(
- "The table has been modified. The parameter value for key '" +
- HiveTableOperations.METADATA_LOCATION_PROP +
- "' is")) {
+ "The table has been modified. The parameter value for key '" +
+ HiveTableOperations.METADATA_LOCATION_PROP +
+ "' is")) {
// It's possible the HMS client incorrectly retries a successful
operation, due to network
// issue for example, and triggers this exception. So we need
double-check to make sure
// this is really a concurrent modification. Hitting this exception
means no pending
@@ -242,16 +255,17 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
commitStatus = checkCommitStatusStrict(newMetadataLocation,
metadata);
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
throw new CommitFailedException(
- e, "The table %s.%s has been modified concurrently",
database, tableName);
+ e, "The table %s.%s has been modified concurrently", database,
tableName);
}
} else {
LOG.error(
- "Cannot tell if commit to {}.{} succeeded, attempting to
reconnect and check.",
- database,
- tableName,
- e);
+ "Cannot tell if commit to {}.{} succeeded, attempting to
reconnect and check.",
+ database,
+ tableName,
+ e);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
}
+
switch (commitStatus) {
case SUCCESS:
break;
@@ -276,7 +290,8 @@ protected void doCommit(TableMetadata base, TableMetadata
metadata) {
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus,
newMetadataLocation, lock);
}
- LOG.info("Committed to table {} with the new metadata location {}",
fullName, newMetadataLocation);
+ LOG.info(
+ "Committed to table {} with the new metadata location {}", fullName,
newMetadataLocation);
}
@Override
@@ -306,14 +321,17 @@ public ClientPool<IMetaStoreClient, TException>
metaClients() {
/**
* Returns if the hive engine related values should be enabled on the table,
or not.
- * <p>
- * The decision is made like this:
+ *
+ * <p>The decision is made like this:
+ *
* <ol>
- * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
- * <li>If the table property is not set then check the hive-site.xml
property value
- * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
- * <li>If none of the above is enabled then use the default value {@link
TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+ * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+ * <li>If the table property is not set then check the hive-site.xml
property value {@link
+ * ConfigProperties#ENGINE_HIVE_ENABLED}
+ * <li>If none of the above is enabled then use the default value {@link
+ * TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
* </ol>
+ *
* @param metadata Table metadata to use
* @param conf The hive configuration to use
* @return if the hive engine related values should be enabled or not
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
index 583998709fb..c0c959974a6 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
@@ -20,10 +20,7 @@
package org.apache.iceberg.hive;
import java.util.Collections;
-import java.util.Locale;
-import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -71,11 +68,11 @@ final class HiveViewOperations extends BaseViewOperations
implements HiveOperati
private final String catalogName;
HiveViewOperations(
- Configuration conf,
- ClientPool<IMetaStoreClient, TException> metaClients,
- FileIO fileIO,
- String catalogName,
- TableIdentifier viewIdentifier) {
+ Configuration conf,
+ ClientPool<IMetaStoreClient, TException> metaClients,
+ FileIO fileIO,
+ String catalogName,
+ TableIdentifier viewIdentifier) {
this.conf = conf;
this.catalogName = catalogName;
this.metaClients = metaClients;
@@ -84,7 +81,7 @@ final class HiveViewOperations extends BaseViewOperations
implements HiveOperati
this.database = viewIdentifier.namespace().level(0);
this.viewName = viewIdentifier.name();
this.maxHiveTablePropertySize =
- conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE,
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+ conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE,
HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
}
@Override
@@ -94,10 +91,14 @@ public void doRefresh() {
try {
table = metaClients.run(client -> client.getTable(database, viewName));
+
+ // Check if we are trying to load an Iceberg Table as a View
+ HiveOperationsBase.validateIcebergTableNotLoadedAsIcebergView(table,
fullName);
+ // Check if it is a valid Iceberg View
HiveOperationsBase.validateTableIsIcebergView(table, fullName);
metadataLocation =
-
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
} catch (NoSuchObjectException e) {
if (currentMetadataLocation() != null) {
@@ -105,7 +106,7 @@ public void doRefresh() {
}
} catch (TException e) {
String errMsg =
- String.format("Failed to get view info from metastore %s.%s",
database, viewName);
+ String.format("Failed to get view info from metastore %s.%s",
database, viewName);
throw new RuntimeException(errMsg, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -136,11 +137,11 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
// concurrent commit
if (newView &&
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP) !=
null) {
throw new AlreadyExistsException(
- "%s already exists: %s.%s",
- TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(
- tbl.getTableType()) ? ContentType.VIEW.value() :
ContentType.TABLE.value(),
- database,
- viewName);
+ "%s already exists: %s.%s",
+ TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(
+ tbl.getTableType()) ? ContentType.VIEW.value() :
ContentType.TABLE.value(),
+ database,
+ viewName);
}
updateHiveView = true;
@@ -151,32 +152,36 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
}
tbl.setSd(
- HiveOperationsBase.storageDescriptor(
- metadata.schema(),
- metadata.location(),
- hiveEngineEnabled)); // set to pick up any schema changes
+ HiveOperationsBase.storageDescriptor(
+ metadata.schema(),
+ metadata.location(),
+ hiveEngineEnabled)); // set to pick up any schema changes
String metadataLocation =
-
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+
tbl.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation()
: null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
throw new CommitFailedException(
- "Cannot commit: Base metadata location '%s' is not same as the
current view metadata location " +
- "'%s' for %s.%s",
- baseMetadataLocation, metadataLocation, database, viewName);
+ "Cannot commit: Base metadata location '%s' is not same as the
current view metadata location " +
+ "'%s' for %s.%s",
+ baseMetadataLocation, metadataLocation, database, viewName);
}
// get Iceberg props that have been removed
Set<String> removedProps = emptySet();
if (base != null) {
removedProps =
- base.properties().keySet().stream()
- .filter(key -> !metadata.properties().containsKey(key))
- .collect(Collectors.toSet());
+ base.properties().keySet().stream()
+ .filter(key -> !metadata.properties().containsKey(key))
+ .collect(Collectors.toSet());
}
-
- setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps);
-
+ HMSTablePropertyHelper.updateHmsTableForIcebergView(
+ newMetadataLocation,
+ tbl,
+ metadata,
+ removedProps,
+ maxHiveTablePropertySize,
+ currentMetadataLocation());
lock.ensureActive();
try {
@@ -187,12 +192,12 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
} catch (LockException le) {
commitStatus = CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
- "Failed to heartbeat for hive lock while " +
- "committing changes. This can lead to a concurrent
commit attempt be able to overwrite " +
- "this commit. " +
- "Please check the commit history. If you are running
into this issue, try reducing " +
- "iceberg.hive.lock-heartbeat-interval-ms.",
- le);
+ "Failed to heartbeat for hive lock while " +
+ "committing changes. This can lead to a concurrent commit
attempt be able to overwrite " +
+ "this commit. " +
+ "Please check the commit history. If you are running into this
issue, try reducing " +
+ "iceberg.hive.lock-heartbeat-interval-ms.",
+ le);
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException(e, "View already exists: %s.%s",
database, viewName);
@@ -204,34 +209,34 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
} catch (Throwable e) {
if (e.getMessage() != null &&
- e.getMessage().contains(
- "The table has been modified. The parameter value for
key '" +
-
BaseMetastoreTableOperations.METADATA_LOCATION_PROP +
- "' is")) {
+ e.getMessage().contains(
+ "The table has been modified. The parameter value for key '" +
+ BaseMetastoreTableOperations.METADATA_LOCATION_PROP +
+ "' is")) {
throw new CommitFailedException(
- e, "The view %s.%s has been modified concurrently",
database, viewName);
+ e, "The view %s.%s has been modified concurrently", database,
viewName);
}
if (e.getMessage() != null && e.getMessage().contains("Table/View
'HIVE_LOCKS' does not exist")) {
throw new RuntimeException(
- "Failed to acquire locks from metastore because the
underlying metastore " +
- "view 'HIVE_LOCKS' does not exist. This can occur
when using an embedded metastore " +
- "which does not support transactions. To fix this
use an alternative metastore.",
- e);
+ "Failed to acquire locks from metastore because the underlying
metastore " +
+ "view 'HIVE_LOCKS' does not exist. This can occur when using
an embedded metastore " +
+ "which does not support transactions. To fix this use an
alternative metastore.",
+ e);
}
LOG.error(
- "Cannot tell if commit to {}.{} succeeded, attempting to
reconnect and check.",
- database,
- viewName,
- e);
+ "Cannot tell if commit to {}.{} succeeded, attempting to reconnect
and check.",
+ database,
+ viewName,
+ e);
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
commitStatus =
- checkCommitStatus(
- viewName,
- newMetadataLocation,
- metadata.properties(),
- () ->
checkCurrentMetadataLocation(newMetadataLocation));
+ checkCommitStatus(
+ viewName,
+ newMetadataLocation,
+ metadata.properties(),
+ () -> checkCurrentMetadataLocation(newMetadataLocation));
switch (commitStatus) {
case SUCCESS:
break;
@@ -243,7 +248,7 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
}
} catch (TException e) {
throw new RuntimeException(
- String.format("Metastore operation failed for %s.%s", database,
viewName), e);
+ String.format("Metastore operation failed for %s.%s", database,
viewName), e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -257,7 +262,7 @@ public void doCommit(ViewMetadata base, ViewMetadata
metadata) {
}
LOG.info(
- "Committed to view {} with the new metadata location {}",
fullName, newMetadataLocation);
+ "Committed to view {} with the new metadata location {}", fullName,
newMetadataLocation);
}
/**
@@ -271,36 +276,6 @@ private boolean checkCurrentMetadataLocation(String
newMetadataLocation) {
return newMetadataLocation.equals(metadata.metadataFileLocation());
}
- private void setHmsTableParameters(
- String newMetadataLocation, Table tbl, ViewMetadata metadata,
Set<String> obsoleteProps) {
- Map<String, String> parameters =
-
Optional.ofNullable(tbl.getParameters()).orElseGet(Maps::newHashMap);
-
- // push all Iceberg view properties into HMS
- metadata.properties().entrySet().stream()
- .filter(entry ->
!entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
- .forEach(entry -> parameters.put(entry.getKey(),
entry.getValue()));
- if (metadata.uuid() != null) {
- parameters.put("uuid", metadata.uuid());
- }
-
- // remove any props from HMS that are no longer present in Iceberg view
props
- obsoleteProps.forEach(parameters::remove);
-
- parameters.put(
- BaseMetastoreTableOperations.TABLE_TYPE_PROP,
- ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
- parameters.put(BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
newMetadataLocation);
-
- if (currentMetadataLocation() != null &&
!currentMetadataLocation().isEmpty()) {
- parameters.put(
- BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP,
currentMetadataLocation());
- }
-
-// setSchema(metadata.schema(), parameters);
- tbl.setParameters(parameters);
- }
-
private static boolean hiveLockEnabled(Configuration conf) {
return conf.getBoolean(ConfigProperties.LOCK_HIVE_ENABLED, true);
}
@@ -308,23 +283,23 @@ private static boolean hiveLockEnabled(Configuration
conf) {
private Table newHMSView(ViewMetadata metadata) {
final long currentTimeMillis = System.currentTimeMillis();
String hmsTableOwner =
- PropertyUtil.propertyAsString(
- metadata.properties(), HiveCatalog.HMS_TABLE_OWNER,
HiveHadoopUtil.currentUser());
+ PropertyUtil.propertyAsString(
+ metadata.properties(), HiveCatalog.HMS_TABLE_OWNER,
HiveHadoopUtil.currentUser());
String sqlQuery = sqlFor(metadata);
return new Table(
- table(),
- database(),
- hmsTableOwner,
- (int) currentTimeMillis / 1000,
- (int) currentTimeMillis / 1000,
- Integer.MAX_VALUE,
- null,
- Collections.emptyList(),
- Maps.newHashMap(),
- sqlQuery,
- sqlQuery,
- tableType().name());
+ table(),
+ database(),
+ hmsTableOwner,
+ (int) currentTimeMillis / 1000,
+ (int) currentTimeMillis / 1000,
+ Integer.MAX_VALUE,
+ null,
+ Collections.emptyList(),
+ Maps.newHashMap(),
+ sqlQuery,
+ sqlQuery,
+ tableType().name());
}
private String sqlFor(ViewMetadata metadata) {
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 70ce12a8310..7bf66012030 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -85,7 +85,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-public class HiveTableTest extends HiveTableBaseTest {
+public class HiveTableTest extends HiveTableTestBase {
static final String NON_DEFAULT_DATABASE = "nondefault";
@TempDir
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
similarity index 99%
rename from
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
rename to
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
index 321c1e59716..46ee5ee0cf5 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableBaseTest.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveTableTestBase.java
@@ -45,7 +45,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;
-public class HiveTableBaseTest {
+public class HiveTableTestBase {
static final String TABLE_NAME = "tbl";
static final String DB_NAME = "hivedb";
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
index b59828466d6..36f08e58633 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java
@@ -1076,6 +1076,7 @@ public void testSnapshotStatsTableProperties() throws
Exception {
@Test
public void testSetSnapshotSummary() throws Exception {
+ final long maxHiveTablePropertySize = 4000;
Snapshot snapshot = mock(Snapshot.class);
Map<String, String> summary = Maps.newHashMap();
when(snapshot.summary()).thenReturn(summary);
@@ -1086,7 +1087,6 @@ public void testSetSnapshotSummary() throws Exception {
}
assertThat(JsonUtil.mapper().writeValueAsString(summary).length()).isLessThan(4000);
Map<String, String> parameters = Maps.newHashMap();
- final long maxHiveTablePropertySize = 4000;
HMSTablePropertyHelper.setSnapshotSummary(parameters, snapshot,
maxHiveTablePropertySize);
assertThat(parameters).as("The snapshot summary must be in
parameters").hasSize(1);
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
index 7bd2c882314..343f37dcb42 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java
@@ -48,7 +48,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-public class TestHiveCommits extends HiveTableBaseTest {
+public class TestHiveCommits extends HiveTableTestBase {
@Test
public void testSuppressUnlockExceptions() {
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
similarity index 99%
rename from
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
rename to
iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
index 2c5246e8f9d..994ca189971 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java
@@ -52,7 +52,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-public class HiveCreateReplaceTableTest {
+public class TestHiveCreateReplaceTable {
private static final String DB_NAME = "hivedb";
private static final String TABLE_NAME = "tbl";
diff --git
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
index b2e41bfd446..48f60ec716e 100644
---
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
+++
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveTableConcurrency.java
@@ -40,7 +40,7 @@
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.assertj.core.api.Assertions.assertThat;
-public class TestHiveTableConcurrency extends HiveTableBaseTest {
+public class TestHiveTableConcurrency extends HiveTableTestBase {
@Test
public synchronized void testConcurrentFastAppends() {
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
index 388cdf35b3b..2259eff2916 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/describe_iceberg_metadata_tables.q.out
@@ -58,7 +58,7 @@ key_metadata binary Encryption key
metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -171,7 +171,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -210,7 +210,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -237,7 +237,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -287,7 +287,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -314,7 +314,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -342,7 +342,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -462,7 +462,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -503,7 +503,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -531,7 +531,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -584,7 +584,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -612,7 +612,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -639,7 +639,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -752,7 +752,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -792,7 +792,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -820,7 +820,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -873,7 +873,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
@@ -901,7 +901,7 @@ key_metadata binary
Encryption key metadata blob
split_offsets array<bigint> Splittable offsets
equality_ids array<int> Equality comparison field IDs
sort_order_id int Sort order ID
-first_row_id bigint Starting row ID to assign to
new rows
+first_row_id bigint The first row ID assigned to
the first row in the data file
referenced_data_file string Fully qualified location (URI
with FS scheme) of a data file that all deletes reference
content_offset bigint The offset in the file where
the content starts
content_size_in_bytes bigint The length of referenced
content stored in the file
diff --git a/iceberg/patched-iceberg-api/pom.xml
b/iceberg/patched-iceberg-api/pom.xml
index 19ec82fcb85..fbdd0b72322 100644
--- a/iceberg/patched-iceberg-api/pom.xml
+++ b/iceberg/patched-iceberg-api/pom.xml
@@ -36,11 +36,6 @@
<version>${iceberg.version}</version>
<optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-bundled-guava</artifactId>
- <version>${iceberg.version}</version>
- </dependency>
</dependencies>
<build>
<plugins>
@@ -64,7 +59,6 @@
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<excludes>
- **/StructProjection.class
</excludes>
</artifactItem>
</artifactItems>
diff --git
a/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
b/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
deleted file mode 100644
index 15f5d965206..00000000000
---
a/iceberg/patched-iceberg-api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.List;
-import java.util.Set;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.ListType;
-import org.apache.iceberg.types.Types.MapType;
-import org.apache.iceberg.types.Types.StructType;
-
-public class StructProjection implements StructLike {
- /**
- * Creates a projecting wrapper for {@link StructLike} rows.
- *
- * <p>This projection does not work with repeated types like lists and maps.
- *
- * @param schema schema of rows wrapped by this projection
- * @param ids field ids from the row schema to project
- * @return a wrapper to project rows
- */
- public static StructProjection create(Schema schema, Set<Integer> ids) {
- StructType structType = schema.asStruct();
- return new StructProjection(structType, TypeUtil.project(structType, ids));
- }
-
- /**
- * Creates a projecting wrapper for {@link StructLike} rows.
- *
- * <p>This projection does not work with repeated types like lists and maps.
- *
- * @param dataSchema schema of rows wrapped by this projection
- * @param projectedSchema result schema of the projected rows
- * @return a wrapper to project rows
- */
- public static StructProjection create(Schema dataSchema, Schema
projectedSchema) {
- return new StructProjection(dataSchema.asStruct(),
projectedSchema.asStruct());
- }
-
- /**
- * Creates a projecting wrapper for {@link StructLike} rows.
- *
- * <p>This projection does not work with repeated types like lists and maps.
- *
- * @param structType type of rows wrapped by this projection
- * @param projectedStructType result type of the projected rows
- * @return a wrapper to project rows
- */
- public static StructProjection create(StructType structType, StructType
projectedStructType) {
- return new StructProjection(structType, projectedStructType);
- }
-
- /**
- * Creates a projecting wrapper for {@link StructLike} rows.
- *
- * <p>This projection allows missing fields and does not work with repeated
types like lists and
- * maps.
- *
- * @param structType type of rows wrapped by this projection
- * @param projectedStructType result type of the projected rows
- * @return a wrapper to project rows
- */
- public static StructProjection createAllowMissing(
- StructType structType, StructType projectedStructType) {
- return new StructProjection(structType, projectedStructType, true);
- }
-
- private final StructType type;
- private final int[] positionMap;
- private final StructProjection[] nestedProjections;
- private StructLike struct;
-
- private StructProjection(
- StructType type, int[] positionMap, StructProjection[]
nestedProjections) {
- this.type = type;
- this.positionMap = positionMap;
- this.nestedProjections = nestedProjections;
- }
-
- private StructProjection(StructType structType, StructType projection) {
- this(structType, projection, false);
- }
-
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
- private StructProjection(StructType structType, StructType projection,
boolean allowMissing) {
- this.type = projection;
- this.positionMap = new int[projection.fields().size()];
- this.nestedProjections = new StructProjection[projection.fields().size()];
-
- // set up the projection positions and any nested projections that are
needed
- List<Types.NestedField> dataFields = structType.fields();
- for (int pos = 0; pos < positionMap.length; pos += 1) {
- Types.NestedField projectedField = projection.fields().get(pos);
-
- boolean found = false;
- for (int i = 0; !found && i < dataFields.size(); i += 1) {
- Types.NestedField dataField = dataFields.get(i);
- if (projectedField.fieldId() == dataField.fieldId()) {
- found = true;
- positionMap[pos] = i;
- switch (projectedField.type().typeId()) {
- case STRUCT:
- nestedProjections[pos] =
- new StructProjection(
- dataField.type().asStructType(),
projectedField.type().asStructType());
- break;
- case MAP:
- MapType projectedMap = projectedField.type().asMapType();
- MapType originalMap = dataField.type().asMapType();
-
- boolean keyProjectable =
- !projectedMap.keyType().isNestedType() ||
- projectedMap.keyType().equals(originalMap.keyType());
- boolean valueProjectable =
- !projectedMap.valueType().isNestedType() ||
- projectedMap.valueType().equals(originalMap.valueType());
- Preconditions.checkArgument(
- keyProjectable && valueProjectable,
- "Cannot project a partial map key or value struct. Trying to
project %s out of %s",
- projectedField,
- dataField);
-
- nestedProjections[pos] = null;
- break;
- case LIST:
- ListType projectedList = projectedField.type().asListType();
- ListType originalList = dataField.type().asListType();
-
- boolean elementProjectable =
- !projectedList.elementType().isNestedType() ||
-
projectedList.elementType().equals(originalList.elementType());
- Preconditions.checkArgument(
- elementProjectable,
- "Cannot project a partial list element struct. Trying to
project %s out of %s",
- projectedField,
- dataField);
-
- nestedProjections[pos] = null;
- break;
- default:
- nestedProjections[pos] = null;
- }
- }
- }
-
- if (!found && projectedField.isOptional() && allowMissing) {
- positionMap[pos] = -1;
- nestedProjections[pos] = null;
- } else if (!found) {
- throw new IllegalArgumentException(
- String.format("Cannot find field %s in %s", projectedField,
structType));
- }
- }
- }
-
- public int projectedFields() {
- return (int) Ints.asList(positionMap).stream().filter(val -> val !=
-1).count();
- }
-
- public StructProjection wrap(StructLike newStruct) {
- this.struct = newStruct;
- return this;
- }
-
- public StructProjection copyFor(StructLike newStruct) {
- return new StructProjection(type, positionMap,
nestedProjections).wrap(newStruct);
- }
-
- @Override
- public int size() {
- return type.fields().size();
- }
-
- @Override
- public <T> T get(int pos, Class<T> javaClass) {
- // struct can be null if wrap is not called first before the get call
- // or if a null struct is wrapped.
- if (struct == null) {
- return null;
- }
-
- int structPos = positionMap[pos];
- if (nestedProjections[pos] != null) {
- StructLike nestedStruct = struct.get(structPos, StructLike.class);
- if (nestedStruct == null) {
- return null;
- }
-
- return javaClass.cast(nestedProjections[pos].wrap(nestedStruct));
- }
-
- if (structPos != -1) {
- return struct.get(structPos, javaClass);
- } else {
- return null;
- }
- }
-
- @Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("Cannot set fields in a
TypeProjection");
- }
-}
diff --git a/iceberg/patched-iceberg-core/pom.xml
b/iceberg/patched-iceberg-core/pom.xml
index 6311294f51f..d266b814a45 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -94,10 +94,6 @@
<excludes>
**/HadoopInputFile.class
**/HadoopTableOperations.class
- **/StructLikeMap.class
- **/StructLikeWrapper.class
- org.apache.iceberg.PartitionsTable.class
- org.apache.iceberg.ManifestFilterManager.class
</excludes>
</artifactItem>
</artifactItems>
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
deleted file mode 100644
index 716f8a8a0a8..00000000000
---
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.ResidualEvaluator;
-import org.apache.iceberg.expressions.StrictMetricsEvaluator;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.CharSequenceSet;
-import org.apache.iceberg.util.ManifestFileUtil;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PartitionSet;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.Tasks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This patch effectively reverts the change introduced in Iceberg version
1.9.1
- * (related to #11131, "Core: Optimize MergingSnapshotProducer to use
referenced manifests to determine if manifest
- * needs to be rewritten"). The original change caused orphan delete files to
remain in manifests,
- * which led to incorrect Iceberg table statistics and inaccurate row counts
when tables were
- * queried via Hive. This patch aims to correct the handling of orphan
positional delete files during manifest
- * filtering to ensure accurate table state.
- *
- */
-
-abstract class ManifestFilterManager<F extends ContentFile<F>> {
- private static final Logger LOG =
LoggerFactory.getLogger(ManifestFilterManager.class);
- private static final Joiner COMMA = Joiner.on(",");
-
- protected static class DeleteException extends ValidationException {
- private final String partition;
-
- private DeleteException(String partition) {
- super("Operation would delete existing data");
- this.partition = partition;
- }
-
- public String partition() {
- return partition;
- }
- }
-
- private final Map<Integer, PartitionSpec> specsById;
- private final PartitionSet deleteFilePartitions;
- private final Set<F> deleteFiles = newFileSet();
- private final PartitionSet dropPartitions;
- private final CharSequenceSet deletePaths = CharSequenceSet.empty();
- private Expression deleteExpression = Expressions.alwaysFalse();
- private long minSequenceNumber = 0;
- private boolean failAnyDelete = false;
- private boolean failMissingDeletePaths = false;
- private int duplicateDeleteCount = 0;
- private boolean caseSensitive = true;
-
- // cache filtered manifests to avoid extra work when commits fail.
- private final Map<ManifestFile, ManifestFile> filteredManifests =
Maps.newConcurrentMap();
-
- // tracking where files were deleted to validate retries quickly
- private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
- Maps.newConcurrentMap();
-
- private final Supplier<ExecutorService> workerPoolSupplier;
-
- protected ManifestFilterManager(
- Map<Integer, PartitionSpec> specsById, Supplier<ExecutorService>
executorSupplier) {
- this.specsById = specsById;
- this.deleteFilePartitions = PartitionSet.create(specsById);
- this.dropPartitions = PartitionSet.create(specsById);
- this.workerPoolSupplier = executorSupplier;
- }
-
- protected abstract void deleteFile(String location);
-
- protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
-
- protected abstract ManifestReader<F> newManifestReader(ManifestFile
manifest);
-
- protected abstract Set<F> newFileSet();
-
- protected void failAnyDelete() {
- this.failAnyDelete = true;
- }
-
- protected void failMissingDeletePaths() {
- this.failMissingDeletePaths = true;
- }
-
- /**
- * Add a filter to match files to delete. A file will be deleted if all of
the rows it contains
- * match this or any other filter passed to this method.
- *
- * @param expr an expression to match rows.
- */
- protected void deleteByRowFilter(Expression expr) {
- Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
- invalidateFilteredCache();
- this.deleteExpression = Expressions.or(deleteExpression, expr);
- }
-
- /** Add a partition tuple to drop from the table during the delete phase. */
- protected void dropPartition(int specId, StructLike partition) {
- Preconditions.checkNotNull(partition, "Cannot delete files in invalid
partition: null");
- invalidateFilteredCache();
- dropPartitions.add(specId, partition);
- }
-
- /**
- * Set the sequence number used to remove old delete files.
- *
- * <p>Delete files with a sequence number older than the given value will be
removed. By setting
- * this to the sequence number of the oldest data file in the table, this
will continuously remove
- * delete files that are no longer needed because deletes cannot match any
existing rows in the
- * table.
- *
- * @param sequenceNumber a sequence number used to remove old delete files
- */
- protected void dropDeleteFilesOlderThan(long sequenceNumber) {
- Preconditions.checkArgument(
- sequenceNumber >= 0, "Invalid minimum data sequence number: %s",
sequenceNumber);
- this.minSequenceNumber = sequenceNumber;
- }
-
- void caseSensitive(boolean newCaseSensitive) {
- this.caseSensitive = newCaseSensitive;
- }
-
- /** Add a specific path to be deleted in the new snapshot. */
- void delete(F file) {
- Preconditions.checkNotNull(file, "Cannot delete file: null");
- invalidateFilteredCache();
- deleteFiles.add(file);
- deleteFilePartitions.add(file.specId(), file.partition());
- }
-
- /** Add a specific path to be deleted in the new snapshot. */
- void delete(CharSequence path) {
- Preconditions.checkNotNull(path, "Cannot delete file path: null");
- invalidateFilteredCache();
- deletePaths.add(path);
- }
-
- boolean containsDeletes() {
- return !deletePaths.isEmpty() || !deleteFiles.isEmpty() ||
deleteExpression != Expressions.alwaysFalse() ||
- !dropPartitions.isEmpty();
- }
-
- /**
- * Filter deleted files out of a list of manifests.
- *
- * @param tableSchema the current table schema
- * @param manifests a list of manifests to be filtered
- * @return an array of filtered manifests
- */
- List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile>
manifests) {
- if (manifests == null || manifests.isEmpty()) {
- validateRequiredDeletes();
- return ImmutableList.of();
- }
-
- ManifestFile[] filtered = new ManifestFile[manifests.size()];
- // open all of the manifest files in parallel, use index to avoid
reordering
- Tasks.range(filtered.length)
- .stopOnFailure()
- .throwFailureWhenFinished()
- .executeWith(workerPoolSupplier.get())
- .run(
- index -> {
- ManifestFile manifest = filterManifest(tableSchema,
manifests.get(index));
- filtered[index] = manifest;
- });
-
- validateRequiredDeletes(filtered);
-
- return Arrays.asList(filtered);
- }
-
- /**
- * Creates a snapshot summary builder with the files deleted from the set of
filtered manifests.
- *
- * @param manifests a set of filtered manifests
- */
- SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
- SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
-
- for (ManifestFile manifest : manifests) {
- PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
- Iterable<F> manifestDeletes =
filteredManifestToDeletedFiles.get(manifest);
- if (manifestDeletes != null) {
- for (F file : manifestDeletes) {
- summaryBuilder.deletedFile(manifestSpec, file);
- }
- }
- }
-
- summaryBuilder.incrementDuplicateDeletes(duplicateDeleteCount);
-
- return summaryBuilder;
- }
-
- /**
- * Throws a {@link ValidationException} if any deleted file was not present
in a filtered
- * manifest.
- *
- * @param manifests a set of filtered manifests
- */
- @SuppressWarnings("CollectionUndefinedEquality")
- private void validateRequiredDeletes(ManifestFile... manifests) {
- if (failMissingDeletePaths) {
- Set<F> deletedFiles = deletedFiles(manifests);
- ValidationException.check(
- deletedFiles.containsAll(deleteFiles),
- "Missing required files to delete: %s",
- COMMA.join(
- deleteFiles.stream()
- .filter(f -> !deletedFiles.contains(f))
- .map(ContentFile::location)
- .collect(Collectors.toList())));
-
- CharSequenceSet deletedFilePaths =
- deletedFiles.stream()
- .map(ContentFile::location)
- .collect(Collectors.toCollection(CharSequenceSet::empty));
-
- ValidationException.check(
- deletedFilePaths.containsAll(deletePaths),
- "Missing required files to delete: %s",
- COMMA.join(Iterables.filter(deletePaths, path ->
!deletedFilePaths.contains(path))));
- }
- }
-
- private Set<F> deletedFiles(ManifestFile[] manifests) {
- Set<F> deletedFiles = newFileSet();
-
- if (manifests != null) {
- for (ManifestFile manifest : manifests) {
- Iterable<F> manifestDeletes =
filteredManifestToDeletedFiles.get(manifest);
- if (manifestDeletes != null) {
- for (F file : manifestDeletes) {
- deletedFiles.add(file);
- }
- }
- }
- }
-
- return deletedFiles;
- }
-
- /**
- * Deletes filtered manifests that were created by this class, but are not
in the committed
- * manifest set.
- *
- * @param committed the set of manifest files that were committed
- */
- void cleanUncommitted(Set<ManifestFile> committed) {
- // iterate over a copy of entries to avoid concurrent modification
- List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
- Lists.newArrayList(filteredManifests.entrySet());
-
- for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
- // remove any new filtered manifests that aren't in the committed list
- ManifestFile manifest = entry.getKey();
- ManifestFile filtered = entry.getValue();
- if (!committed.contains(filtered)) {
- // only delete if the filtered copy was created
- if (!manifest.equals(filtered)) {
- deleteFile(filtered.path());
- }
-
- // remove the entry from the cache
- filteredManifests.remove(manifest);
- }
- }
- }
-
- private void invalidateFilteredCache() {
- cleanUncommitted(SnapshotProducer.EMPTY_SET);
- }
-
- /**
- * @return a ManifestReader that is a filtered version of the input manifest.
- */
- private ManifestFile filterManifest(Schema tableSchema, ManifestFile
manifest) {
- ManifestFile cached = filteredManifests.get(manifest);
- if (cached != null) {
- return cached;
- }
-
- boolean hasLiveFiles = manifest.hasAddedFiles() ||
manifest.hasExistingFiles();
- if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
- filteredManifests.put(manifest, manifest);
- return manifest;
- }
-
- try (ManifestReader<F> reader = newManifestReader(manifest)) {
- PartitionSpec spec = reader.spec();
- PartitionAndMetricsEvaluator evaluator =
- new PartitionAndMetricsEvaluator(tableSchema, spec,
deleteExpression);
-
- // this assumes that the manifest doesn't have files to remove and
streams through the
- // manifest without copying data. if a manifest does have a file to
remove, this will break
- // out of the loop and move on to filtering the manifest.
- boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
- if (!hasDeletedFiles) {
- filteredManifests.put(manifest, manifest);
- return manifest;
- }
-
- return filterManifestWithDeletedFiles(evaluator, manifest, reader);
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close manifest: %s",
manifest);
- }
- }
-
- private boolean canContainDeletedFiles(ManifestFile manifest) {
- boolean canContainExpressionDeletes;
- if (deleteExpression != null && deleteExpression !=
Expressions.alwaysFalse()) {
- ManifestEvaluator manifestEvaluator =
- ManifestEvaluator.forRowFilter(
- deleteExpression, specsById.get(manifest.partitionSpecId()),
caseSensitive);
- canContainExpressionDeletes = manifestEvaluator.eval(manifest);
- } else {
- canContainExpressionDeletes = false;
- }
-
- boolean canContainDroppedPartitions;
- if (!dropPartitions.isEmpty()) {
- canContainDroppedPartitions =
- ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
- } else {
- canContainDroppedPartitions = false;
- }
-
- boolean canContainDroppedFiles;
- if (!deletePaths.isEmpty()) {
- canContainDroppedFiles = true;
- } else if (!deleteFiles.isEmpty()) {
- // because there were no path-only deletes, the set of deleted file
partitions is valid
- canContainDroppedFiles =
- ManifestFileUtil.canContainAny(manifest, deleteFilePartitions,
specsById);
- } else {
- canContainDroppedFiles = false;
- }
-
- boolean canContainDropBySeq =
- manifest.content() == ManifestContent.DELETES &&
manifest.minSequenceNumber() < minSequenceNumber;
-
- return canContainExpressionDeletes || canContainDroppedPartitions ||
canContainDroppedFiles || canContainDropBySeq;
- }
-
- @SuppressWarnings({"CollectionUndefinedEquality",
"checkstyle:CyclomaticComplexity"})
- private boolean manifestHasDeletedFiles(
- PartitionAndMetricsEvaluator evaluator, ManifestReader<F> reader) {
- boolean isDelete = reader.isDeleteManifestReader();
-
- for (ManifestEntry<F> entry : reader.liveEntries()) {
- F file = entry.file();
- boolean markedForDelete =
- deletePaths.contains(file.location()) || deleteFiles.contains(file)
||
- dropPartitions.contains(file.specId(), file.partition()) ||
- isDelete && entry.isLive() && entry.dataSequenceNumber() > 0 &&
- entry.dataSequenceNumber() < minSequenceNumber;
-
- if (markedForDelete || evaluator.rowsMightMatch(file)) {
- boolean allRowsMatch = markedForDelete ||
evaluator.rowsMustMatch(file);
- ValidationException.check(
- allRowsMatch || isDelete, // ignore delete files where some
records may not match the expression
- "Cannot delete file where some, but not all, rows match filter %s:
%s",
- this.deleteExpression,
- file.location());
-
- if (allRowsMatch) {
- if (failAnyDelete) {
- throw new
DeleteException(reader.spec().partitionToPath(file.partition()));
- }
-
- // as soon as a deleted file is detected, stop scanning
- return true;
- }
- }
- }
-
- return false;
- }
-
- @SuppressWarnings({"CollectionUndefinedEquality",
"checkstyle:CyclomaticComplexity"})
- private ManifestFile filterManifestWithDeletedFiles(
- PartitionAndMetricsEvaluator evaluator, ManifestFile manifest,
ManifestReader<F> reader) {
- boolean isDelete = reader.isDeleteManifestReader();
- // when this point is reached, there is at least one file that will be
deleted in the
- // manifest. produce a copy of the manifest with all deleted files removed.
- Set<F> deletedFiles = newFileSet();
-
- try {
- ManifestWriter<F> writer = newManifestWriter(reader.spec());
- try {
- reader
- .liveEntries()
- .forEach(
- entry -> {
- F file = entry.file();
- boolean markedForDelete =
- deletePaths.contains(file.location()) ||
deleteFiles.contains(file) ||
- dropPartitions.contains(file.specId(),
file.partition()) ||
- isDelete && entry.isLive() &&
entry.dataSequenceNumber() > 0 &&
- entry.dataSequenceNumber() < minSequenceNumber;
- if (markedForDelete || evaluator.rowsMightMatch(file)) {
- boolean allRowsMatch = markedForDelete ||
evaluator.rowsMustMatch(file);
- ValidationException.check(
- allRowsMatch || isDelete, // ignore delete files where
some records may not match
- // the expression
- "Cannot delete file where some, but not all, rows
match filter %s: %s",
- this.deleteExpression,
- file.location());
-
- if (allRowsMatch) {
- writer.delete(entry);
-
- if (deletedFiles.contains(file)) {
- LOG.warn(
- "Deleting a duplicate path from manifest {}: {}",
- manifest.path(),
- file.location());
- duplicateDeleteCount += 1;
- } else {
- // only add the file to deletes if it is a new delete
- // this keeps the snapshot summary accurate for
non-duplicate data
- deletedFiles.add(file.copyWithoutStats());
- }
- } else {
- writer.existing(entry);
- }
-
- } else {
- writer.existing(entry);
- }
- });
- } finally {
- writer.close();
- }
-
- // return the filtered manifest as a reader
- ManifestFile filtered = writer.toManifestFile();
-
- // update caches
- filteredManifests.put(manifest, filtered);
- filteredManifestToDeletedFiles.put(filtered, deletedFiles);
-
- return filtered;
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close manifest writer");
- }
- }
-
- // an evaluator that checks whether rows in a file may/must match a given
expression
- // this class first partially evaluates the provided expression using the
partition tuple
- // and then checks the remaining part of the expression using metrics
evaluators
- private class PartitionAndMetricsEvaluator {
- private final Schema tableSchema;
- private final ResidualEvaluator residualEvaluator;
- private final StructLikeMap<Pair<InclusiveMetricsEvaluator,
StrictMetricsEvaluator>>
- metricsEvaluators;
-
- PartitionAndMetricsEvaluator(Schema tableSchema, PartitionSpec spec,
Expression expr) {
- this.tableSchema = tableSchema;
- this.residualEvaluator = ResidualEvaluator.of(spec, expr, caseSensitive);
- this.metricsEvaluators = StructLikeMap.create(spec.partitionType());
- }
-
- boolean rowsMightMatch(F file) {
- Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> evaluators =
metricsEvaluators(file);
- InclusiveMetricsEvaluator inclusiveMetricsEvaluator = evaluators.first();
- return inclusiveMetricsEvaluator.eval(file);
- }
-
- boolean rowsMustMatch(F file) {
- Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> evaluators =
metricsEvaluators(file);
- StrictMetricsEvaluator strictMetricsEvaluator = evaluators.second();
- return strictMetricsEvaluator.eval(file);
- }
-
- private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator>
metricsEvaluators(F file) {
- // ResidualEvaluator removes predicates in the expression using
strict/inclusive projections
- // if strict projection returns true -> the pred would return true ->
replace the pred with
- // true
- // if inclusive projection returns false -> the pred would return false
-> replace the pred
- // with false
- // otherwise, keep the original predicate and proceed to other
predicates in the expression
- // in other words, ResidualEvaluator returns a part of the expression
that needs to be
- // evaluated
- // for rows in the given partition using metrics
- PartitionData partition = (PartitionData) file.partition();
- if (!metricsEvaluators.containsKey(partition)) {
- Expression residual = residualEvaluator.residualFor(partition);
- InclusiveMetricsEvaluator inclusive =
- new InclusiveMetricsEvaluator(tableSchema, residual,
caseSensitive);
- StrictMetricsEvaluator strict =
- new StrictMetricsEvaluator(tableSchema, residual, caseSensitive);
-
- metricsEvaluators.put(
- partition.copy(), // The partition may be a re-used container so a
copy is required
- Pair.of(inclusive, strict));
- }
- return metricsEvaluators.get(partition);
- }
- }
-}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
deleted file mode 100644
index a1f31e28443..00000000000
---
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.io.CloseableIterable;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ParallelIterable;
-import org.apache.iceberg.util.PartitionUtil;
-import org.apache.iceberg.util.StructLikeMap;
-import org.apache.iceberg.util.StructProjection;
-
-/** A {@link Table} implementation that exposes a table's partitions as rows.
*/
-public class PartitionsTable extends BaseMetadataTable {
-
- private final Schema schema;
-
- private final boolean unpartitionedTable;
-
- PartitionsTable(Table table) {
- this(table, table.name() + ".partitions");
- }
-
- PartitionsTable(Table table, String name) {
- super(table, name);
-
- this.schema =
- new Schema(
- Types.NestedField.required(1, "partition",
Partitioning.partitionType(table)),
- Types.NestedField.required(4, "spec_id", Types.IntegerType.get()),
- Types.NestedField.required(
- 2, "record_count", Types.LongType.get(), "Count of records in
data files"),
- Types.NestedField.required(
- 3, "file_count", Types.IntegerType.get(), "Count of data
files"),
- Types.NestedField.required(
- 11,
- "total_data_file_size_in_bytes",
- Types.LongType.get(),
- "Total size in bytes of data files"),
- Types.NestedField.required(
- 5,
- "position_delete_record_count",
- Types.LongType.get(),
- "Count of records in position delete files"),
- Types.NestedField.required(
- 6,
- "position_delete_file_count",
- Types.IntegerType.get(),
- "Count of position delete files"),
- Types.NestedField.required(
- 7,
- "equality_delete_record_count",
- Types.LongType.get(),
- "Count of records in equality delete files"),
- Types.NestedField.required(
- 8,
- "equality_delete_file_count",
- Types.IntegerType.get(),
- "Count of equality delete files"),
- Types.NestedField.optional(
- 9,
- "last_updated_at",
- Types.TimestampType.withZone(),
- "Commit time of snapshot that last updated this partition"),
- Types.NestedField.optional(
- 10,
- "last_updated_snapshot_id",
- Types.LongType.get(),
- "Id of snapshot that last updated this partition"));
- this.unpartitionedTable =
Partitioning.partitionType(table).fields().isEmpty();
- }
-
- @Override
- public TableScan newScan() {
- return new PartitionsScan(table());
- }
-
- @Override
- public Schema schema() {
- if (unpartitionedTable) {
- return schema.select(
- "record_count",
- "file_count",
- "total_data_file_size_in_bytes",
- "position_delete_record_count",
- "position_delete_file_count",
- "equality_delete_record_count",
- "equality_delete_file_count",
- "last_updated_at",
- "last_updated_snapshot_id");
- }
- return schema;
- }
-
- @Override
- MetadataTableType metadataTableType() {
- return MetadataTableType.PARTITIONS;
- }
-
- private DataTask task(StaticTableScan scan) {
- Iterable<Partition> partitions = partitions(table(), scan);
- if (unpartitionedTable) {
- // the table is unpartitioned, partitions contains only the root
partition
- return StaticDataTask.of(
-
io().newInputFile(table().operations().current().metadataFileLocation()),
- schema(),
- scan.schema(),
- partitions,
- root ->
- StaticDataTask.Row.of(
- root.dataRecordCount,
- root.dataFileCount,
- root.dataFileSizeInBytes,
- root.posDeleteRecordCount,
- root.posDeleteFileCount,
- root.eqDeleteRecordCount,
- root.eqDeleteFileCount,
- root.lastUpdatedAt,
- root.lastUpdatedSnapshotId));
- } else {
- return StaticDataTask.of(
-
io().newInputFile(table().operations().current().metadataFileLocation()),
- schema(),
- scan.schema(),
- partitions,
- PartitionsTable::convertPartition);
- }
- }
-
- private static StaticDataTask.Row convertPartition(Partition partition) {
- return StaticDataTask.Row.of(
- partition.partitionData,
- partition.specId,
- partition.dataRecordCount,
- partition.dataFileCount,
- partition.dataFileSizeInBytes,
- partition.posDeleteRecordCount,
- partition.posDeleteFileCount,
- partition.eqDeleteRecordCount,
- partition.eqDeleteFileCount,
- partition.lastUpdatedAt,
- partition.lastUpdatedSnapshotId);
- }
-
- private static Iterable<Partition> partitions(Table table, StaticTableScan
scan) {
- Types.StructType partitionType = Partitioning.partitionType(table);
-
- StructLikeMap<Partition> partitions =
- StructLikeMap.create(partitionType, new
PartitionComparator(partitionType));
-
- try (CloseableIterable<ManifestEntry<? extends ContentFile<?>>> entries =
planEntries(scan)) {
- for (ManifestEntry<? extends ContentFile<?>> entry : entries) {
- Snapshot snapshot = table.snapshot(entry.snapshotId());
- ContentFile<?> file = entry.file();
- StructLike key =
- PartitionUtil.coercePartition(
- partitionType, table.specs().get(file.specId()),
file.partition());
- partitions
- .computeIfAbsent(key, () -> new Partition(key, partitionType))
- .update(file, snapshot);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- return partitions.values();
- }
-
- @VisibleForTesting
- static CloseableIterable<ManifestEntry<?>> planEntries(StaticTableScan scan)
{
- Table table = scan.table();
-
- CloseableIterable<ManifestFile> filteredManifests =
- filteredManifests(scan, table,
scan.snapshot().allManifests(table.io()));
-
- Iterable<CloseableIterable<ManifestEntry<?>>> tasks =
- CloseableIterable.transform(filteredManifests, manifest ->
readEntries(manifest, scan));
-
- return new ParallelIterable<>(tasks, scan.planExecutor());
- }
-
- private static CloseableIterable<ManifestEntry<?>> readEntries(
- ManifestFile manifest, StaticTableScan scan) {
- Table table = scan.table();
- return CloseableIterable.transform(
- ManifestFiles.open(manifest, table.io(), table.specs())
- .caseSensitive(scan.isCaseSensitive())
- .select(BaseScan.scanColumns(manifest.content())) // don't select
stats columns
- .liveEntries(),
- t ->
- (ManifestEntry<? extends ContentFile<?>>)
- // defensive copy of manifest entry without stats columns
- t.copyWithoutStats());
- }
-
- private static CloseableIterable<ManifestFile> filteredManifests(
- StaticTableScan scan, Table table, List<ManifestFile> manifestFilesList)
{
- CloseableIterable<ManifestFile> manifestFiles =
- CloseableIterable.withNoopClose(manifestFilesList);
-
- LoadingCache<Integer, ManifestEvaluator> evalCache =
- Caffeine.newBuilder()
- .build(
- specId -> {
- PartitionSpec spec = table.specs().get(specId);
- PartitionSpec transformedSpec =
transformSpec(scan.tableSchema(), spec);
- return ManifestEvaluator.forRowFilter(
- scan.filter(), transformedSpec, scan.isCaseSensitive());
- });
-
- return CloseableIterable.filter(
- manifestFiles, manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
- }
-
- private class PartitionsScan extends StaticTableScan {
- PartitionsScan(Table table) {
- super(
- table,
- PartitionsTable.this.schema(),
- MetadataTableType.PARTITIONS,
- PartitionsTable.this::task);
- }
- }
-
- private static class PartitionComparator implements Comparator<StructLike> {
- private Comparator<StructLike> comparator;
-
- private PartitionComparator(Types.StructType struct) {
- this.comparator = Comparators.forType(struct);
- }
-
- @Override
- public int compare(StructLike o1, StructLike o2) {
- if (o1 instanceof StructProjection && o2 instanceof StructProjection) {
- int cmp =
- Integer.compare(
- ((StructProjection) o1).projectedFields(),
- ((StructProjection) o2).projectedFields());
- if (cmp != 0) {
- return cmp;
- }
- }
- return comparator.compare(o1, o2);
- }
- }
-
- static class Partition {
- private final PartitionData partitionData;
- private int specId;
- private long dataRecordCount;
- private int dataFileCount;
- private long dataFileSizeInBytes;
- private long posDeleteRecordCount;
- private int posDeleteFileCount;
- private long eqDeleteRecordCount;
- private int eqDeleteFileCount;
- private Long lastUpdatedAt;
- private Long lastUpdatedSnapshotId;
-
- Partition(StructLike key, Types.StructType keyType) {
- this.partitionData = toPartitionData(key, keyType);
- this.specId = 0;
- this.dataRecordCount = 0L;
- this.dataFileCount = 0;
- this.dataFileSizeInBytes = 0L;
- this.posDeleteRecordCount = 0L;
- this.posDeleteFileCount = 0;
- this.eqDeleteRecordCount = 0L;
- this.eqDeleteFileCount = 0;
- }
-
- void update(ContentFile<?> file, Snapshot snapshot) {
- if (snapshot != null) {
- long snapshotCommitTime = snapshot.timestampMillis() * 1000;
- if (this.lastUpdatedAt == null || snapshotCommitTime >
this.lastUpdatedAt) {
- this.specId = file.specId();
-
- this.lastUpdatedAt = snapshotCommitTime;
- this.lastUpdatedSnapshotId = snapshot.snapshotId();
- }
- }
- switch (file.content()) {
- case DATA:
- this.dataRecordCount += file.recordCount();
- this.dataFileCount += 1;
- this.dataFileSizeInBytes += file.fileSizeInBytes();
- break;
- case POSITION_DELETES:
- this.posDeleteRecordCount += file.recordCount();
- this.posDeleteFileCount += 1;
- break;
- case EQUALITY_DELETES:
- this.eqDeleteRecordCount += file.recordCount();
- this.eqDeleteFileCount += 1;
- break;
- default:
- throw new UnsupportedOperationException(
- "Unsupported file content type: " + file.content());
- }
- }
-
- /** Needed because StructProjection is not serializable */
- private static PartitionData toPartitionData(StructLike key,
Types.StructType keyType) {
- PartitionData keyTemplate = new PartitionData(keyType);
- return keyTemplate.copyFor(key);
- }
- }
-}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
deleted file mode 100644
index 0efc9e44680..00000000000
---
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeMap.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.AbstractMap;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types;
-
-public class StructLikeMap<T> extends AbstractMap<StructLike, T> implements
Map<StructLike, T> {
-
- public static <T> StructLikeMap<T> create(
- Types.StructType type, Comparator<StructLike> comparator) {
- return new StructLikeMap<>(type, comparator);
- }
-
- public static <T> StructLikeMap<T> create(Types.StructType type) {
- return create(type, Comparators.forType(type));
- }
-
- private final Types.StructType type;
- private final Map<StructLikeWrapper, T> wrapperMap;
- private final ThreadLocal<StructLikeWrapper> wrappers;
-
- private StructLikeMap(Types.StructType type, Comparator<StructLike>
comparator) {
- this.type = type;
- this.wrapperMap = Maps.newHashMap();
- this.wrappers = ThreadLocal.withInitial(() ->
StructLikeWrapper.forType(type, comparator));
- }
-
- @Override
- public int size() {
- return wrapperMap.size();
- }
-
- @Override
- public boolean isEmpty() {
- return wrapperMap.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- if (key instanceof StructLike || key == null) {
- StructLikeWrapper wrapper = wrappers.get();
- boolean result = wrapperMap.containsKey(wrapper.set((StructLike) key));
- wrapper.set(null); // don't hold a reference to the key.
- return result;
- }
- return false;
- }
-
- @Override
- public boolean containsValue(Object value) {
- return wrapperMap.containsValue(value);
- }
-
- @Override
- public T get(Object key) {
- if (key instanceof StructLike || key == null) {
- StructLikeWrapper wrapper = wrappers.get();
- T value = wrapperMap.get(wrapper.set((StructLike) key));
- wrapper.set(null); // don't hold a reference to the key.
- return value;
- }
- return null;
- }
-
- @Override
- public T put(StructLike key, T value) {
- return wrapperMap.put(wrappers.get().copyFor(key), value);
- }
-
- @Override
- public T remove(Object key) {
- if (key instanceof StructLike || key == null) {
- StructLikeWrapper wrapper = wrappers.get();
- T value = wrapperMap.remove(wrapper.set((StructLike) key));
- wrapper.set(null); // don't hold a reference to the key.
- return value;
- }
- return null;
- }
-
- @Override
- public void clear() {
- wrapperMap.clear();
- }
-
- @Override
- public Set<StructLike> keySet() {
- StructLikeSet keySet = StructLikeSet.create(type);
- for (StructLikeWrapper wrapper : wrapperMap.keySet()) {
- keySet.add(wrapper.get());
- }
- return keySet;
- }
-
- @Override
- public Collection<T> values() {
- return wrapperMap.values();
- }
-
- @Override
- public Set<Entry<StructLike, T>> entrySet() {
- Set<Entry<StructLike, T>> entrySet = Sets.newHashSet();
- for (Entry<StructLikeWrapper, T> entry : wrapperMap.entrySet()) {
- entrySet.add(new StructLikeEntry<>(entry));
- }
- return entrySet;
- }
-
- public T computeIfAbsent(StructLike struct, Supplier<T> valueSupplier) {
- return wrapperMap.computeIfAbsent(wrappers.get().copyFor(struct), key ->
valueSupplier.get());
- }
-
- private static class StructLikeEntry<R> implements Entry<StructLike, R> {
-
- private final Entry<StructLikeWrapper, R> inner;
-
- private StructLikeEntry(Entry<StructLikeWrapper, R> inner) {
- this.inner = inner;
- }
-
- @Override
- public StructLike getKey() {
- return inner.getKey().get();
- }
-
- @Override
- public R getValue() {
- return inner.getValue();
- }
-
- @Override
- public int hashCode() {
- return inner.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- } else if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StructLikeEntry<?> that = (StructLikeEntry<?>) o;
- return inner.equals(that.inner);
- }
-
- @Override
- public R setValue(R value) {
- throw new UnsupportedOperationException("Does not support setValue.");
- }
- }
-
- public <U> StructLikeMap<U> transformValues(Function<T, U> func) {
- StructLikeMap<U> result = create(type);
- wrapperMap.forEach((key, value) -> result.put(key.get(),
func.apply(value)));
- return result;
- }
-}
diff --git
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
deleted file mode 100644
index 3dbb91a43ce..00000000000
---
a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/util/StructLikeWrapper.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.util;
-
-import java.util.Comparator;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.JavaHash;
-import org.apache.iceberg.types.Types;
-
-/** Wrapper to adapt StructLike for use in maps and sets by implementing
equals and hashCode. */
-public class StructLikeWrapper {
-
- public static StructLikeWrapper forType(
- Types.StructType type, Comparator<StructLike> comparator) {
- return new StructLikeWrapper(comparator, JavaHash.forType(type));
- }
-
- public static StructLikeWrapper forType(Types.StructType type) {
- return forType(type, Comparators.forType(type));
- }
-
- private final Comparator<StructLike> comparator;
- private final JavaHash<StructLike> structHash;
- private Integer hashCode;
- private StructLike struct;
-
- private StructLikeWrapper(Comparator<StructLike> comparator,
JavaHash<StructLike> structHash) {
- this.comparator = comparator;
- this.structHash = structHash;
- this.hashCode = null;
- }
-
- /**
- * Creates a copy of this wrapper that wraps a struct.
- *
- * <p>This is equivalent to {@code new
StructLikeWrapper(type).set(newStruct)} but is cheaper
- * because no analysis of the type is necessary.
- *
- * @param newStruct a {@link StructLike} row
- * @return a copy of this wrapper wrapping the give struct
- */
- public StructLikeWrapper copyFor(StructLike newStruct) {
- return new StructLikeWrapper(comparator, structHash).set(newStruct);
- }
-
- public StructLikeWrapper set(StructLike newStruct) {
- this.struct = newStruct;
- this.hashCode = null;
- return this;
- }
-
- public StructLike get() {
- return struct;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- } else if (!(other instanceof StructLikeWrapper)) {
- return false;
- }
-
- StructLikeWrapper that = (StructLikeWrapper) other;
-
- if (this.struct == that.struct) {
- return true;
- }
-
- if (this.struct == null ^ that.struct == null) {
- return false;
- }
-
- return comparator.compare(this.struct, that.struct) == 0;
- }
-
- @Override
- public int hashCode() {
- if (hashCode == null) {
- this.hashCode = structHash.hash(struct);
- }
-
- return hashCode;
- }
-}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index 10ce7e12c1c..1c5bfed17c0 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -26,7 +26,7 @@
<hive.path.to.root>..</hive.path.to.root>
<path.to.iceberg.root>.</path.to.iceberg.root>
<!-- Upgrade roaringbit version in parent pom.xml whenever upgrading
iceberg version -->
- <iceberg.version>1.9.1</iceberg.version>
+ <iceberg.version>1.10.0</iceberg.version>
<kryo-shaded.version>4.0.3</kryo-shaded.version>
<iceberg.mockito-core.version>5.2.0</iceberg.mockito-core.version>
<iceberg.avro.version>1.12.0</iceberg.avro.version>
diff --git a/pom.xml b/pom.xml
index 6247b18bf2c..3630ea868e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,8 +151,8 @@
<httpcomponents.client.version>4.5.13</httpcomponents.client.version>
<httpcomponents.core.version>4.4.13</httpcomponents.core.version>
<immutables.version>2.9.2</immutables.version>
- <httpcomponents5.core.version>5.3.1</httpcomponents5.core.version>
- <httpcomponents5.client.version>5.3.1</httpcomponents5.client.version>
+ <httpcomponents5.core.version>5.3.4</httpcomponents5.core.version>
+ <httpcomponents5.client.version>5.5</httpcomponents5.client.version>
<ivy.version>2.5.2</ivy.version>
<jackson.version>2.16.1</jackson.version>
<jamon.plugin.version>2.3.4</jamon.plugin.version>
@@ -195,7 +195,7 @@
<!-- used by druid storage handler -->
<pac4j-saml.version>4.5.8</pac4j-saml.version>
<paranamer.version>2.8</paranamer.version>
- <parquet.version>1.15.2</parquet.version>
+ <parquet.version>1.16.0</parquet.version>
<pig.version>0.16.0</pig.version>
<plexus.version>1.5.6</plexus.version>
<protobuf.version>3.25.5</protobuf.version>
@@ -597,7 +597,7 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
- <version>${httpcomponents5.core.version}</version>
+ <version>${httpcomponents5.client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
diff --git a/standalone-metastore/metastore-rest-catalog/pom.xml
b/standalone-metastore/metastore-rest-catalog/pom.xml
index c1468e4f0c1..896723d2276 100644
--- a/standalone-metastore/metastore-rest-catalog/pom.xml
+++ b/standalone-metastore/metastore-rest-catalog/pom.xml
@@ -23,7 +23,7 @@
<standalone.metastore.path.to.root>..</standalone.metastore.path.to.root>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<log4j2.debug>false</log4j2.debug>
- <iceberg.version>1.9.1</iceberg.version>
+ <iceberg.version>1.10.0</iceberg.version>
</properties>
<dependencies>
<dependency>