dennishuo commented on code in PR #433:
URL: https://github.com/apache/polaris/pull/433#discussion_r1833638321


##########
polaris-core/src/main/java/org/apache/polaris/core/entity/TableMetadataEntity.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * A {@link PolarisEntity} for storing table metadata. This can contain the 
raw content of the
+ * `metadata.json` or more granular information
+ */
+public class TableMetadataEntity extends PolarisEntity {
+  private static final String CONTENT_KEY = "content";
+  private static final String METADATA_LOCATION_KEY = "metadata_location";
+
+  public TableMetadataEntity(PolarisBaseEntity sourceEntity) {
+    super(sourceEntity);
+  }
+
+  public static TableMetadataEntity of(PolarisBaseEntity sourceEntity) {
+    if (sourceEntity != null) {
+      return new TableMetadataEntity(sourceEntity);
+    }
+    return null;
+  }
+
+  @JsonIgnore
+  public String getContent() {
+    return getInternalPropertiesAsMap().get(CONTENT_KEY);

Review Comment:
   Could this just be stored in `TableLikeEntity`'s internalProperties instead 
of introducing a new entity type?



##########
polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.persistence;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.TableLikeEntity;
+import org.apache.polaris.core.entity.TableMetadataEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataCacheManager.class);
+
+  /** Load the cached {@link Table} or fall back to `fallback` if one doesn't 
exist */
+  public static TableMetadata loadTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView,
+      Supplier<TableMetadata> fallback) {
+    LOGGER.debug(String.format("Loading cached metadata for %s", 
tableIdentifier));
+    Optional<TableMetadata> cachedMetadata =
+        loadCachedTableMetadata(tableIdentifier, callContext, entityManager, 
resolvedEntityView);
+    if (cachedMetadata.isPresent()) {
+      LOGGER.debug(String.format("Using cached metadata for %s", 
tableIdentifier));
+      return cachedMetadata.get();
+    } else {
+      TableMetadata metadata = fallback.get();
+      PolarisMetaStoreManager.EntityResult cacheResult =
+          cacheTableMetadata(
+              tableIdentifier, metadata, callContext, entityManager, 
resolvedEntityView);
+      if (!cacheResult.isSuccess()) {
+        LOGGER.debug(String.format("Failed to cache metadata for %s", 
tableIdentifier));
+      }
+      return metadata;
+    }
+  }
+
+  /**
+   * Attempt to add table metadata to the cache
+   *
+   * @return The result of trying to cache the metadata
+   */
+  private static PolarisMetaStoreManager.EntityResult cacheTableMetadata(
+      TableIdentifier tableIdentifier,
+      TableMetadata metadata,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return new PolarisMetaStoreManager.EntityResult(
+          PolarisMetaStoreManager.ReturnStatus.ENTITY_NOT_FOUND, null);
+    } else {
+      TableLikeEntity tableEntity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      TableMetadataEntity tableMetadataEntity =
+          new TableMetadataEntity.Builder()
+              .setCatalogId(tableEntity.getCatalogId())
+              .setParentId(tableEntity.getId())
+              
.setId(entityManager.getMetaStoreManager().generateNewEntityId(callContext).getId())
+              .setCreateTimestamp(System.currentTimeMillis())
+              .setMetadataLocation(metadata.metadataFileLocation())
+              .setContent(TableMetadataParser.toJson(metadata))
+              .build();
+      try {
+        return entityManager
+            .getMetaStoreManager()
+            .createEntityIfNotExists(
+                callContext,
+                PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
+                tableMetadataEntity);
+      } catch (RuntimeException e) {
+        // PersistenceException (& other extension-specific exceptions) may 
not be in scope,
+        // but we can make a best-effort attempt to swallow it and just forego 
caching
+        if (e.toString().contains("PersistenceException")) {
+          return new PolarisMetaStoreManager.EntityResult(
+              PolarisMetaStoreManager.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, 
e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  /** Return the cached {@link Table} entity, if one exists */
+  private static @NotNull Optional<TableMetadata> loadCachedTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return Optional.empty();
+    } else {
+      TableLikeEntity entity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      String metadataLocation = entity.getMetadataLocation();
+      PolarisMetaStoreManager.ListEntitiesResult metadataResult =
+          entityManager
+              .getMetaStoreManager()
+              .listEntities(

Review Comment:
   The extra listEntities round trip here could really hurt performance. Is 
this just because we're not making the creation of the metadata entity atomic 
with the underlying TableLikeEntity so multiple creations of metadata entities 
could happen at the same time? 



##########
polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.persistence;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.TableLikeEntity;
+import org.apache.polaris.core.entity.TableMetadataEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataCacheManager.class);
+
+  /** Load the cached {@link Table} or fall back to `fallback` if one doesn't 
exist */
+  public static TableMetadata loadTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView,
+      Supplier<TableMetadata> fallback) {
+    LOGGER.debug(String.format("Loading cached metadata for %s", 
tableIdentifier));
+    Optional<TableMetadata> cachedMetadata =
+        loadCachedTableMetadata(tableIdentifier, callContext, entityManager, 
resolvedEntityView);
+    if (cachedMetadata.isPresent()) {
+      LOGGER.debug(String.format("Using cached metadata for %s", 
tableIdentifier));
+      return cachedMetadata.get();
+    } else {
+      TableMetadata metadata = fallback.get();
+      PolarisMetaStoreManager.EntityResult cacheResult =
+          cacheTableMetadata(
+              tableIdentifier, metadata, callContext, entityManager, 
resolvedEntityView);
+      if (!cacheResult.isSuccess()) {
+        LOGGER.debug(String.format("Failed to cache metadata for %s", 
tableIdentifier));
+      }
+      return metadata;
+    }
+  }
+
+  /**
+   * Attempt to add table metadata to the cache
+   *
+   * @return The result of trying to cache the metadata
+   */
+  private static PolarisMetaStoreManager.EntityResult cacheTableMetadata(
+      TableIdentifier tableIdentifier,
+      TableMetadata metadata,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return new PolarisMetaStoreManager.EntityResult(
+          PolarisMetaStoreManager.ReturnStatus.ENTITY_NOT_FOUND, null);
+    } else {
+      TableLikeEntity tableEntity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      TableMetadataEntity tableMetadataEntity =
+          new TableMetadataEntity.Builder()
+              .setCatalogId(tableEntity.getCatalogId())
+              .setParentId(tableEntity.getId())
+              
.setId(entityManager.getMetaStoreManager().generateNewEntityId(callContext).getId())
+              .setCreateTimestamp(System.currentTimeMillis())
+              .setMetadataLocation(metadata.metadataFileLocation())
+              .setContent(TableMetadataParser.toJson(metadata))
+              .build();
+      try {
+        return entityManager
+            .getMetaStoreManager()
+            .createEntityIfNotExists(
+                callContext,
+                PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
+                tableMetadataEntity);
+      } catch (RuntimeException e) {
+        // PersistenceException (& other extension-specific exceptions) may 
not be in scope,
+        // but we can make a best-effort attempt to swallow it and just forego 
caching
+        if (e.toString().contains("PersistenceException")) {
+          return new PolarisMetaStoreManager.EntityResult(
+              PolarisMetaStoreManager.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, 
e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  /** Return the cached {@link Table} entity, if one exists */
+  private static @NotNull Optional<TableMetadata> loadCachedTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return Optional.empty();
+    } else {
+      TableLikeEntity entity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      String metadataLocation = entity.getMetadataLocation();
+      PolarisMetaStoreManager.ListEntitiesResult metadataResult =
+          entityManager
+              .getMetaStoreManager()
+              .listEntities(
+                  callContext,
+                  PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
+                  PolarisEntityType.TABLE_METADATA,
+                  PolarisEntitySubType.ANY_SUBTYPE);
+      return Optional.ofNullable(metadataResult.getEntities()).stream()
+          .flatMap(Collection::stream)
+          .flatMap(
+              result -> {
+                PolarisMetaStoreManager.EntityResult metadataEntityResult =
+                    entityManager
+                        .getMetaStoreManager()
+                        .loadEntity(callContext, result.getCatalogId(), 
result.getId());

Review Comment:
   Using loadEntity means we won't benefit from the `EntityCache` which means 
an extra roundtrip to the persistence store.
   
   If we embed the contents directly into the main `TableLikeEntity` then the 
EntityCache including its cache invalidation will just automatically work.



##########
polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.persistence;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.TableLikeEntity;
+import org.apache.polaris.core.entity.TableMetadataEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataCacheManager.class);
+
+  /** Load the cached {@link Table} or fall back to `fallback` if one doesn't 
exist */
+  public static TableMetadata loadTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView,
+      Supplier<TableMetadata> fallback) {
+    LOGGER.debug(String.format("Loading cached metadata for %s", 
tableIdentifier));
+    Optional<TableMetadata> cachedMetadata =
+        loadCachedTableMetadata(tableIdentifier, callContext, entityManager, 
resolvedEntityView);
+    if (cachedMetadata.isPresent()) {
+      LOGGER.debug(String.format("Using cached metadata for %s", 
tableIdentifier));
+      return cachedMetadata.get();
+    } else {
+      TableMetadata metadata = fallback.get();
+      PolarisMetaStoreManager.EntityResult cacheResult =
+          cacheTableMetadata(
+              tableIdentifier, metadata, callContext, entityManager, 
resolvedEntityView);
+      if (!cacheResult.isSuccess()) {
+        LOGGER.debug(String.format("Failed to cache metadata for %s", 
tableIdentifier));
+      }
+      return metadata;
+    }
+  }
+
+  /**
+   * Attempt to add table metadata to the cache
+   *
+   * @return The result of trying to cache the metadata
+   */
+  private static PolarisMetaStoreManager.EntityResult cacheTableMetadata(
+      TableIdentifier tableIdentifier,
+      TableMetadata metadata,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return new PolarisMetaStoreManager.EntityResult(
+          PolarisMetaStoreManager.ReturnStatus.ENTITY_NOT_FOUND, null);
+    } else {
+      TableLikeEntity tableEntity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      TableMetadataEntity tableMetadataEntity =
+          new TableMetadataEntity.Builder()
+              .setCatalogId(tableEntity.getCatalogId())
+              .setParentId(tableEntity.getId())
+              
.setId(entityManager.getMetaStoreManager().generateNewEntityId(callContext).getId())
+              .setCreateTimestamp(System.currentTimeMillis())
+              .setMetadataLocation(metadata.metadataFileLocation())
+              .setContent(TableMetadataParser.toJson(metadata))
+              .build();
+      try {
+        return entityManager
+            .getMetaStoreManager()
+            .createEntityIfNotExists(
+                callContext,
+                PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),

Review Comment:
   Is this the first time we have something with a parentId that isn't a 
Namespace or a Catalog?



##########
polaris-service/src/main/java/org/apache/polaris/service/persistence/MetadataCacheManager.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.persistence;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.TableLikeEntity;
+import org.apache.polaris.core.entity.TableMetadataEntity;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataCacheManager.class);
+
+  /** Load the cached {@link Table} or fall back to `fallback` if one doesn't 
exist */
+  public static TableMetadata loadTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView,
+      Supplier<TableMetadata> fallback) {
+    LOGGER.debug(String.format("Loading cached metadata for %s", 
tableIdentifier));
+    Optional<TableMetadata> cachedMetadata =
+        loadCachedTableMetadata(tableIdentifier, callContext, entityManager, 
resolvedEntityView);
+    if (cachedMetadata.isPresent()) {
+      LOGGER.debug(String.format("Using cached metadata for %s", 
tableIdentifier));
+      return cachedMetadata.get();
+    } else {
+      TableMetadata metadata = fallback.get();
+      PolarisMetaStoreManager.EntityResult cacheResult =
+          cacheTableMetadata(
+              tableIdentifier, metadata, callContext, entityManager, 
resolvedEntityView);
+      if (!cacheResult.isSuccess()) {
+        LOGGER.debug(String.format("Failed to cache metadata for %s", 
tableIdentifier));
+      }
+      return metadata;
+    }
+  }
+
+  /**
+   * Attempt to add table metadata to the cache
+   *
+   * @return The result of trying to cache the metadata
+   */
+  private static PolarisMetaStoreManager.EntityResult cacheTableMetadata(
+      TableIdentifier tableIdentifier,
+      TableMetadata metadata,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return new PolarisMetaStoreManager.EntityResult(
+          PolarisMetaStoreManager.ReturnStatus.ENTITY_NOT_FOUND, null);
+    } else {
+      TableLikeEntity tableEntity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      TableMetadataEntity tableMetadataEntity =
+          new TableMetadataEntity.Builder()
+              .setCatalogId(tableEntity.getCatalogId())
+              .setParentId(tableEntity.getId())
+              
.setId(entityManager.getMetaStoreManager().generateNewEntityId(callContext).getId())
+              .setCreateTimestamp(System.currentTimeMillis())
+              .setMetadataLocation(metadata.metadataFileLocation())
+              .setContent(TableMetadataParser.toJson(metadata))
+              .build();
+      try {
+        return entityManager
+            .getMetaStoreManager()
+            .createEntityIfNotExists(
+                callContext,
+                PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
+                tableMetadataEntity);
+      } catch (RuntimeException e) {
+        // PersistenceException (& other extension-specific exceptions) may 
not be in scope,
+        // but we can make a best-effort attempt to swallow it and just forego 
caching
+        if (e.toString().contains("PersistenceException")) {
+          return new PolarisMetaStoreManager.EntityResult(
+              PolarisMetaStoreManager.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, 
e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
+  /** Return the cached {@link Table} entity, if one exists */
+  private static @NotNull Optional<TableMetadata> loadCachedTableMetadata(
+      TableIdentifier tableIdentifier,
+      PolarisCallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, 
PolarisEntitySubType.TABLE);
+    if (resolvedEntities == null) {
+      return Optional.empty();
+    } else {
+      TableLikeEntity entity = 
TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
+      String metadataLocation = entity.getMetadataLocation();
+      PolarisMetaStoreManager.ListEntitiesResult metadataResult =
+          entityManager
+              .getMetaStoreManager()
+              .listEntities(
+                  callContext,
+                  PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
+                  PolarisEntityType.TABLE_METADATA,
+                  PolarisEntitySubType.ANY_SUBTYPE);
+      return Optional.ofNullable(metadataResult.getEntities()).stream()
+          .flatMap(Collection::stream)
+          .flatMap(
+              result -> {
+                PolarisMetaStoreManager.EntityResult metadataEntityResult =
+                    entityManager
+                        .getMetaStoreManager()
+                        .loadEntity(callContext, result.getCatalogId(), 
result.getId());
+                return Optional.ofNullable(metadataEntityResult.getEntity())
+                    .map(TableMetadataEntity::of)
+                    .stream();
+              })
+          .filter(
+              metadata -> {
+                if (metadata.getMetadataLocation().equals(metadataLocation)) {
+                  return true;
+                } else {
+                  LOGGER.debug(
+                      String.format("Deleting old entry for %s", 
metadata.getMetadataLocation()));
+                  entityManager
+                      .getMetaStoreManager()
+                      .dropEntityIfExists(

Review Comment:
   Do we have any cleanup for cached metadata entities when the underlying 
table is dropped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to