This is an automated email from the ASF dual-hosted git repository.
collado pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 07edd301f Add properties from TableMetadata into Table entity
internalProperties (#2735)
07edd301f is described below
commit 07edd301f6cbb87ff2cf875bc5ea0b99875acded
Author: Michael Collado <[email protected]>
AuthorDate: Thu Oct 23 09:19:18 2025 -0700
Add properties from TableMetadata into Table entity internalProperties
(#2735)
* Add properties from TableMetadata into Table entity internalProperties
* Made table properties constants and pulled out static utility method
---
.../apache/polaris/core/entity/PolarisEntity.java | 2 +-
.../core/entity/table/IcebergTableLikeEntity.java | 78 ++++++++++++++++++
.../core/persistence/cache/EntityWeigherTest.java | 2 +-
.../service/catalog/iceberg/IcebergCatalog.java | 43 +++++++++-
.../iceberg/AbstractIcebergCatalogTest.java | 95 ++++++++++++++++++++++
5 files changed, 217 insertions(+), 3 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
index 98701af45..1cd4d5947 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
@@ -411,7 +411,7 @@ public class PolarisEntity extends PolarisBaseEntity {
return (B) this;
}
- public B setInternalProperties(Map<String, String> internalProperties) {
+ public B setInternalProperties(@Nonnull Map<String, String>
internalProperties) {
this.internalProperties = new HashMap<>(internalProperties);
return (B) this;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java
b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java
index ea7af6363..f1b756779 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/entity/table/IcebergTableLikeEntity.java
@@ -20,7 +20,10 @@ package org.apache.polaris.core.entity.table;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Preconditions;
+import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +50,40 @@ public class IcebergTableLikeEntity extends TableLikeEntity {
public static final String LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY =
"last-notification-timestamp";
+ /*
+ * The following constants are copied from the TableMetadataParser in Iceberg
+ * They represent the keys used in the table metadata JSON file.
+ */
+
+ public static final String FORMAT_VERSION = "format-version";
+ public static final String TABLE_UUID = "table-uuid";
+ public static final String LOCATION = "location";
+ public static final String LAST_SEQUENCE_NUMBER = "last-sequence-number";
+ public static final String LAST_UPDATED_MILLIS = "last-updated-ms";
+ public static final String LAST_COLUMN_ID = "last-column-id";
+ public static final String SCHEMA = "schema";
+ public static final String SCHEMAS = "schemas";
+ public static final String CURRENT_SCHEMA_ID = "current-schema-id";
+ public static final String PARTITION_SPEC = "partition-spec";
+ public static final String PARTITION_SPECS = "partition-specs";
+ public static final String DEFAULT_SPEC_ID = "default-spec-id";
+ public static final String LAST_PARTITION_ID = "last-partition-id";
+ public static final String DEFAULT_SORT_ORDER_ID = "default-sort-order-id";
+ public static final String SORT_ORDERS = "sort-orders";
+ public static final String PROPERTIES = "properties";
+ public static final String CURRENT_SNAPSHOT_ID = "current-snapshot-id";
+ public static final String REFS = "refs";
+ public static final String SNAPSHOTS = "snapshots";
+ public static final String SNAPSHOT_ID = "snapshot-id";
+ public static final String TIMESTAMP_MS = "timestamp-ms";
+ public static final String SNAPSHOT_LOG = "snapshot-log";
+ public static final String METADATA_FILE = "metadata-file";
+ public static final String METADATA_LOG = "metadata-log";
+ public static final String STATISTICS = "statistics";
+ public static final String PARTITION_STATISTICS = "partition-statistics";
+ public static final String ENCRYPTION_KEYS = "encryption-keys";
+ public static final String NEXT_ROW_ID = "next-row-id";
+
public IcebergTableLikeEntity(PolarisBaseEntity sourceEntity) {
super(sourceEntity);
PolarisEntitySubType subType = getSubType();
@@ -83,6 +120,24 @@ public class IcebergTableLikeEntity extends TableLikeEntity
{
}
public static class Builder extends
PolarisEntity.BaseBuilder<IcebergTableLikeEntity, Builder> {
+
+ public Builder(
+ PolarisEntitySubType subType,
+ TableIdentifier identifier,
+ Map<String, String> properties,
+ Map<String, String> internalProperties,
+ String metadataLocation) {
+ super();
+ setType(PolarisEntityType.TABLE_LIKE);
+ setSubType(subType);
+ setProperties(properties);
+ setInternalProperties(internalProperties);
+ // order here matters. properties and internal properties must be set
prior to the following
+ // properties, which merely update the map, whereas the above calls
replace the map entirely.
+ setTableIdentifier(identifier);
+ setMetadataLocation(metadataLocation);
+ }
+
public Builder(
PolarisEntitySubType subType, TableIdentifier identifier, String
metadataLocation) {
super();
@@ -121,6 +176,29 @@ public class IcebergTableLikeEntity extends
TableLikeEntity {
return this;
}
+ @Override
+ public Builder setInternalProperties(@Nonnull Map<String, String>
internalProperties) {
+ // ensure we carry forward the parent namespace and metadata location if
already set.
+ // however, we allow for overriding them if explicitly specified in the
provided map.
+ Map<String, String> newInternalProperties = new HashMap<>();
+ if (this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY) !=
null) {
+ newInternalProperties.put(
+ NamespaceEntity.PARENT_NAMESPACE_KEY,
+ this.internalProperties.get(NamespaceEntity.PARENT_NAMESPACE_KEY));
+ }
+ if (this.internalProperties.get(METADATA_LOCATION_KEY) != null) {
+ newInternalProperties.put(
+ METADATA_LOCATION_KEY,
this.internalProperties.get(METADATA_LOCATION_KEY));
+ }
+ if
(this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY) != null)
{
+ newInternalProperties.put(
+ LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY,
+
this.internalProperties.get(LAST_ADMITTED_NOTIFICATION_TIMESTAMP_KEY));
+ }
+ newInternalProperties.putAll(internalProperties);
+ return super.setInternalProperties(newInternalProperties);
+ }
+
public Builder setMetadataLocation(String location) {
internalProperties.put(METADATA_LOCATION_KEY, location);
return this;
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java
index 109a52172..780fbc944 100644
---
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java
+++
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/EntityWeigherTest.java
@@ -124,7 +124,7 @@ public class EntityWeigherTest {
"location",
"{\"a\": \"b\"}",
Optional.of("{\"c\": \"d\", \"e\": \"f\"}")));
- Assertions.assertThat(preciseWeight).isEqualTo(1090);
+ Assertions.assertThat(preciseWeight).isEqualTo(1183); // :( this is
hard-coded
}
private static Map<String, String> getPropertiesMap(String properties) {
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index c3e248b52..633bca5f9 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -1493,6 +1493,7 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
"Generic table with same name already exists: %s",
tableIdentifier);
}
}
+ Map<String, String> storedProperties =
buildTableMetadataPropertiesMap(metadata);
IcebergTableLikeEntity entity =
IcebergTableLikeEntity.of(resolvedPath == null ? null :
resolvedPath.getRawLeafEntity());
String existingLocation;
@@ -1500,7 +1501,11 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
existingLocation = null;
entity =
new IcebergTableLikeEntity.Builder(
- PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
newLocation)
+ PolarisEntitySubType.ICEBERG_TABLE,
+ tableIdentifier,
+ Map.of(),
+ storedProperties,
+ newLocation)
.setCatalogId(getCatalogId())
.setBaseLocation(metadata.location())
.setId(
@@ -1510,6 +1515,7 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
existingLocation = entity.getMetadataLocation();
entity =
new IcebergTableLikeEntity.Builder(entity)
+ .setInternalProperties(storedProperties)
.setBaseLocation(metadata.location())
.setMetadataLocation(newLocation)
.build();
@@ -1639,6 +1645,41 @@ public class IcebergCatalog extends
BaseMetastoreViewCatalog
}
}
+ private static Map<String, String>
buildTableMetadataPropertiesMap(TableMetadata metadata) {
+ Map<String, String> storedProperties = new HashMap<>();
+ storedProperties.put(IcebergTableLikeEntity.LOCATION, metadata.location());
+ storedProperties.put(
+ IcebergTableLikeEntity.FORMAT_VERSION,
String.valueOf(metadata.formatVersion()));
+ storedProperties.put(IcebergTableLikeEntity.TABLE_UUID, metadata.uuid());
+ storedProperties.put(
+ IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
String.valueOf(metadata.currentSchemaId()));
+ if (metadata.currentSnapshot() != null) {
+ storedProperties.put(
+ IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
+ String.valueOf(metadata.currentSnapshot().snapshotId()));
+ }
+ storedProperties.put(
+ IcebergTableLikeEntity.LAST_COLUMN_ID,
String.valueOf(metadata.lastColumnId()));
+ storedProperties.put(IcebergTableLikeEntity.NEXT_ROW_ID,
String.valueOf(metadata.nextRowId()));
+ storedProperties.put(
+ IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER,
String.valueOf(metadata.lastSequenceNumber()));
+ storedProperties.put(
+ IcebergTableLikeEntity.LAST_UPDATED_MILLIS,
String.valueOf(metadata.lastUpdatedMillis()));
+ if (metadata.sortOrder() != null) {
+ storedProperties.put(
+ IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
+ String.valueOf(metadata.defaultSortOrderId()));
+ }
+ if (metadata.spec() != null) {
+ storedProperties.put(
+ IcebergTableLikeEntity.DEFAULT_SPEC_ID,
String.valueOf(metadata.defaultSpecId()));
+ storedProperties.put(
+ IcebergTableLikeEntity.LAST_PARTITION_ID,
+ String.valueOf(metadata.lastAssignedPartitionId()));
+ }
+ return storedProperties;
+ }
+
/**
* An implementation of {@link ViewOperations} that integrates with {@link
IcebergCatalog}. Much
* of this code was originally copied from {@link
org.apache.iceberg.view.BaseViewOperations}.
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index cc7054ff1..72cda7ecb 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -44,6 +44,7 @@ import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -62,6 +63,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
@@ -103,12 +105,14 @@ import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.NamespaceEntity;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.exceptions.CommitConflictException;
import org.apache.polaris.core.identity.provider.ServiceIdentityProvider;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
@@ -156,6 +160,7 @@ import org.apache.polaris.service.types.NotificationType;
import org.apache.polaris.service.types.TableUpdateNotification;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.assertj.core.configuration.PreferredAssumptionException;
@@ -2286,6 +2291,96 @@ public abstract class AbstractIcebergCatalogTest extends
CatalogTests<IcebergCat
}
}
+ @Test
+ public void testTableInternalPropertiesStoredOnCommit() {
+ Assumptions.assumeTrue(
+ requiresNamespaceCreate(),
+ "Only applicable if namespaces must be created before adding
children");
+
+ catalog.createNamespace(NS);
+ catalog.buildTable(TABLE, SCHEMA).create();
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_A).commit();
+ Table afterAppend = catalog.loadTable(TABLE);
+ EntityResult schemaResult =
+ metaStoreManager.readEntityByName(
+ polarisContext,
+ List.of(catalogEntity),
+ PolarisEntityType.NAMESPACE,
+ PolarisEntitySubType.NULL_SUBTYPE,
+ NS.toString());
+ Assertions.assertThat(schemaResult).returns(true, EntityResult::isSuccess);
+ EntityResult tableResult =
+ metaStoreManager.readEntityByName(
+ polarisContext,
+ List.of(catalogEntity, schemaResult.getEntity()),
+ PolarisEntityType.TABLE_LIKE,
+ PolarisEntitySubType.ICEBERG_TABLE,
+ TABLE.name());
+ Assertions.assertThat(tableResult)
+ .returns(true, EntityResult::isSuccess)
+ .extracting(er -> PolarisEntity.of(er.getEntity()))
+ .extracting(PolarisEntity::getInternalPropertiesAsMap)
+ .asInstanceOf(InstanceOfAssertFactories.map(String.class,
String.class))
+ .containsEntry(NamespaceEntity.PARENT_NAMESPACE_KEY, NS.toString())
+ .containsEntry(
+ IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
+ String.valueOf(afterAppend.currentSnapshot().snapshotId()))
+ .containsEntry(IcebergTableLikeEntity.LOCATION, afterAppend.location())
+ .containsEntry(IcebergTableLikeEntity.TABLE_UUID,
afterAppend.uuid().toString())
+ .containsEntry(
+ IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
+ String.valueOf(afterAppend.schema().schemaId()))
+ .containsEntry(
+ IcebergTableLikeEntity.LAST_COLUMN_ID,
+ afterAppend.schema().columns().stream()
+ .max(Comparator.comparing(Types.NestedField::fieldId))
+ .map(Types.NestedField::fieldId)
+ .orElse(0)
+ .toString())
+ .containsEntry(
+ IcebergTableLikeEntity.LAST_SEQUENCE_NUMBER,
+ String.valueOf(afterAppend.currentSnapshot().sequenceNumber()));
+
+ catalog.loadTable(TABLE).refresh();
+ catalog.loadTable(TABLE).newFastAppend().appendFile(FILE_B).commit();
+ validatePropertiesUpdated(
+ schemaResult,
+ IcebergTableLikeEntity.CURRENT_SNAPSHOT_ID,
+ tbl -> String.valueOf(tbl.currentSnapshot().snapshotId()));
+
+ catalog.loadTable(TABLE).refresh();
+ catalog.loadTable(TABLE).updateSchema().addColumn("new_col",
Types.LongType.get()).commit();
+ validatePropertiesUpdated(
+ schemaResult,
+ IcebergTableLikeEntity.CURRENT_SCHEMA_ID,
+ tbl -> String.valueOf(tbl.schema().schemaId()));
+
+ catalog.loadTable(TABLE).refresh();
+ catalog.loadTable(TABLE).replaceSortOrder().desc("new_col",
NullOrder.NULLS_FIRST).commit();
+ validatePropertiesUpdated(
+ schemaResult,
+ IcebergTableLikeEntity.DEFAULT_SORT_ORDER_ID,
+ table -> String.valueOf(table.sortOrder().orderId()));
+ }
+
+ private void validatePropertiesUpdated(
+ EntityResult schemaResult, String key, Function<Table, String>
expectedValue) {
+ Table afterUpdate = catalog.loadTable(TABLE);
+ EntityResult tableResult =
+ metaStoreManager.readEntityByName(
+ polarisContext,
+ List.of(catalogEntity, schemaResult.getEntity()),
+ PolarisEntityType.TABLE_LIKE,
+ PolarisEntitySubType.ICEBERG_TABLE,
+ TABLE.name());
+ Assertions.assertThat(tableResult)
+ .returns(true, EntityResult::isSuccess)
+ .extracting(er -> PolarisEntity.of(er.getEntity()))
+ .extracting(PolarisEntity::getInternalPropertiesAsMap)
+ .asInstanceOf(InstanceOfAssertFactories.map(String.class,
String.class))
+ .containsEntry(key, expectedValue.apply(afterUpdate));
+ }
+
@Test
public void testEventsAreEmitted() {
IcebergCatalog catalog = catalog();