This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 0798dced0e [#9915]feat(IRC): Add view entities to Gravitino store 
(#9917)
0798dced0e is described below

commit 0798dced0eb25679bce57e0b2ed96b1cc48c79d9
Author: Bharath Krishna <[email protected]>
AuthorDate: Tue Feb 10 21:33:30 2026 +0530

    [#9915]feat(IRC): Add view entities to Gravitino store (#9917)
    
    ### What changes were proposed in this pull request?
    
    Add logic to add or import the view entities to Gravitino entity store
    This will help maintain consistency between IRC store and Gravitino
    entity store
    
    
    ### Why are the changes needed?
    
    This helps to maintain consistency between IRC and Gravitino entity
    store. It will lazy import entities that are missing in gravitino which
    are present in IRC catalog
    
    Fix: #9915
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. It will just import views to Gravitino entity store
    
    ### How was this patch tested?
    
    Added unit tests
---
 .../gravitino/catalog/EntityCombinedView.java      |  91 ++++++
 .../gravitino/catalog/ViewOperationDispatcher.java | 100 +++++-
 .../catalog/TestViewOperationDispatcher.java       | 250 ++++++++++++--
 .../org/apache/gravitino/iceberg/RESTService.java  |  10 +-
 .../dispatcher/IcebergViewHookDispatcher.java      |  96 +++++-
 .../dispatcher/TestIcebergViewHookDispatcher.java  | 364 +++++++++++++++++++++
 6 files changed, 847 insertions(+), 64 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedView.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedView.java
new file mode 100644
index 0000000000..da8946b569
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedView.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Map;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.meta.GenericEntity;
+import org.apache.gravitino.rel.View;
+
+/**
+ * A View class to represent a view metadata object that combines the metadata 
both from {@link
+ * View} and {@link GenericEntity}.
+ */
+public final class EntityCombinedView implements View {
+
+  private final View view;
+
+  private final GenericEntity viewEntity;
+
+  // Field "imported" is used to indicate whether the entity has been imported 
to Gravitino
+  // managed storage backend. If "imported" is true, it means that storage 
backend have stored
+  // the correct entity. Otherwise, we should import the external entity to 
the storage backend.
+  private boolean imported;
+
+  private EntityCombinedView(View view, GenericEntity viewEntity) {
+    this.view = view;
+    this.viewEntity = viewEntity;
+    this.imported = false;
+  }
+
+  public static EntityCombinedView of(View view, GenericEntity viewEntity) {
+    return new EntityCombinedView(view, viewEntity);
+  }
+
+  public static EntityCombinedView of(View view) {
+    return new EntityCombinedView(view, null);
+  }
+
+  public EntityCombinedView withImported(boolean imported) {
+    this.imported = imported;
+    return this;
+  }
+
+  @Override
+  public String name() {
+    return view.name();
+  }
+
+  @Override
+  public String comment() {
+    return view.comment();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return view.properties();
+  }
+
+  @Override
+  public Audit auditInfo() {
+    return view.auditInfo();
+  }
+
+  public boolean imported() {
+    return imported;
+  }
+
+  public View viewFromCatalog() {
+    return view;
+  }
+
+  public GenericEntity viewFromGravitino() {
+    return viewEntity;
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/ViewOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/ViewOperationDispatcher.java
index c49a93abfb..70f080f7d7 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/ViewOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/ViewOperationDispatcher.java
@@ -20,11 +20,16 @@ package org.apache.gravitino.catalog;
 
 import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
+import java.io.IOException;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchViewException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.rel.View;
 import org.apache.gravitino.storage.IdGenerator;
 import org.slf4j.Logger;
@@ -55,10 +60,8 @@ public class ViewOperationDispatcher extends 
OperationDispatcher implements View
   /**
    * Load view metadata by identifier from the catalog.
    *
-   * <p>Delegates directly to the underlying catalog's ViewCatalog interface. 
Views are loaded from
-   * the external catalog without caching in Gravitino's EntityStore.
-   *
-   * <p>TODO(#9746): Add entity storage support to cache view metadata in 
EntityStore.
+   * <p>This method first checks if the view exists in Gravitino's 
EntityStore. If not found, it
+   * loads from the catalog and auto-imports into EntityStore.
    *
    * @param ident The view identifier.
    * @return The loaded view metadata.
@@ -68,13 +71,86 @@ public class ViewOperationDispatcher extends 
OperationDispatcher implements View
   public View loadView(NameIdentifier ident) throws NoSuchViewException {
     LOG.info("Loading view: {}", ident);
 
-    return TreeLockUtils.doWithTreeLock(
-        ident,
-        LockType.READ,
-        () ->
-            doWithCatalog(
-                getCatalogIdentifier(ident),
-                c -> c.doWithViewOps(v -> v.loadView(ident)),
-                NoSuchViewException.class));
+    // First load with READ lock to check if view is already imported
+    EntityCombinedView entityCombinedView =
+        TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> 
internalLoadView(ident));
+
+    if (!entityCombinedView.imported()) {
+      // Load the schema to make sure the schema is imported.
+      SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+      NameIdentifier schemaIdent = 
NameIdentifier.of(ident.namespace().levels());
+      schemaDispatcher.loadSchema(schemaIdent);
+
+      // Import the view.
+      entityCombinedView =
+          TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> 
importView(ident));
+    }
+
+    return entityCombinedView;
+  }
+
+  /**
+   * Internal method to load view and check if it exists in entity store.
+   *
+   * @param ident The view identifier.
+   * @return EntityCombinedView containing the view and import status.
+   * @throws NoSuchViewException If the view does not exist.
+   */
+  private EntityCombinedView internalLoadView(NameIdentifier ident) throws 
NoSuchViewException {
+    // Load view from the underlying catalog
+    View catalogView =
+        doWithCatalog(
+            getCatalogIdentifier(ident),
+            c -> c.doWithViewOps(v -> v.loadView(ident)),
+            NoSuchViewException.class);
+
+    // Check if view exists in entity store
+    try {
+      GenericEntity viewEntity = store.get(ident, Entity.EntityType.VIEW, 
GenericEntity.class);
+      return EntityCombinedView.of(catalogView, viewEntity).withImported(true);
+    } catch (NoSuchEntityException e) {
+      // View not in store yet
+      LOG.debug("View {} not found in entity store", ident);
+      return EntityCombinedView.of(catalogView).withImported(false);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to check if view {} exists in entity store", ident, 
ioe);
+      return EntityCombinedView.of(catalogView).withImported(false);
+    }
+  }
+
+  /**
+   * Import view into Gravitino entity store.
+   *
+   * @param ident The view identifier.
+   * @return EntityCombinedView containing the view and import status.
+   * @throws NoSuchViewException If the view does not exist.
+   */
+  private EntityCombinedView importView(NameIdentifier ident) throws 
NoSuchViewException {
+    // Double-check if already imported (another thread might have imported 
between locks)
+    EntityCombinedView entityCombinedView = internalLoadView(ident);
+
+    if (entityCombinedView.imported()) {
+      return entityCombinedView;
+    }
+
+    LOG.info("Auto-importing view {} into Gravitino entity store", ident);
+    long uid = idGenerator.nextId();
+    GenericEntity newViewEntity =
+        GenericEntity.builder()
+            .withId(uid)
+            .withName(ident.name())
+            .withNamespace(ident.namespace())
+            .withEntityType(Entity.EntityType.VIEW)
+            .build();
+    try {
+      store.put(newViewEntity, false /* overwrite */);
+      LOG.info("Successfully imported view {} into entity store with id {}", 
ident, uid);
+      return EntityCombinedView.of(entityCombinedView.viewFromCatalog(), 
newViewEntity)
+          .withImported(true);
+    } catch (Exception e) {
+      // Log but don't fail - view import is best-effort
+      LOG.warn("Failed to import view {} into entity store: {}", ident, 
e.getMessage());
+      return 
EntityCombinedView.of(entityCombinedView.viewFromCatalog()).withImported(false);
+    }
   }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestViewOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestViewOperationDispatcher.java
index f602a2b53b..9f91ea6afe 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestViewOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestViewOperationDispatcher.java
@@ -21,23 +21,34 @@ package org.apache.gravitino.catalog;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Entity.EntityType.VIEW;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.TestCatalog;
 import org.apache.gravitino.connector.TestCatalogOperations;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchViewException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.gravitino.rel.View;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -74,6 +85,33 @@ public class TestViewOperationDispatcher extends 
TestOperationDispatcher {
     return catalogManager;
   }
 
+  /**
+   * Helper method to create a mock View object.
+   *
+   * @param name The name of the view
+   * @param props The properties of the view
+   * @param auditInfo The audit info of the view
+   * @return A mock View implementation
+   */
+  private static View createMockView(String name, Map<String, String> props, 
AuditInfo auditInfo) {
+    return new View() {
+      @Override
+      public String name() {
+        return name;
+      }
+
+      @Override
+      public Map<String, String> properties() {
+        return props;
+      }
+
+      @Override
+      public AuditInfo auditInfo() {
+        return auditInfo;
+      }
+    };
+  }
+
   @Test
   public void testLoadView() throws IOException {
     Namespace viewNs = Namespace.of(metalake, catalog, "schema61");
@@ -85,23 +123,7 @@ public class TestViewOperationDispatcher extends 
TestOperationDispatcher {
     // Create a mock view through the catalog operations
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
-    View mockView =
-        new View() {
-          @Override
-          public String name() {
-            return "view1";
-          }
-
-          @Override
-          public Map<String, String> properties() {
-            return props;
-          }
-
-          @Override
-          public AuditInfo auditInfo() {
-            return auditInfo;
-          }
-        };
+    View mockView = createMockView("view1", props, auditInfo);
 
     // Mock the catalog operations to return the view
     TestCatalog testCatalog =
@@ -144,24 +166,7 @@ public class TestViewOperationDispatcher extends 
TestOperationDispatcher {
       NameIdentifier viewIdent = NameIdentifier.of(viewNs, "view" + i);
       AuditInfo auditInfo =
           
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
-      int index = i;
-      View mockView =
-          new View() {
-            @Override
-            public String name() {
-              return "view" + index;
-            }
-
-            @Override
-            public Map<String, String> properties() {
-              return props;
-            }
-
-            @Override
-            public AuditInfo auditInfo() {
-              return auditInfo;
-            }
-          };
+      View mockView = createMockView("view" + i, props, auditInfo);
       testCatalogOperations.views.put(viewIdent, mockView);
     }
 
@@ -172,4 +177,179 @@ public class TestViewOperationDispatcher extends 
TestOperationDispatcher {
       Assertions.assertEquals("view" + i, loadedView.name());
     }
   }
+
+  @Test
+  public void testLoadViewAutoImportsIntoEntityStore() throws IOException {
+    Namespace viewNs = Namespace.of(metalake, catalog, "schema63");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaOperationDispatcher.createSchema(NameIdentifier.of(viewNs.levels()), 
"comment", props);
+
+    NameIdentifier viewIdent = NameIdentifier.of(viewNs, "auto_import_view");
+
+    // Create a mock view through the catalog operations
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    View mockView = createMockView("auto_import_view", props, auditInfo);
+
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.views.put(viewIdent, mockView);
+
+    // Verify view is not in entity store initially
+    Assertions.assertThrows(
+        NoSuchEntityException.class, () -> entityStore.get(viewIdent, VIEW, 
GenericEntity.class));
+
+    // Load view - should auto-import
+    View loadedView = viewOperationDispatcher.loadView(viewIdent);
+    Assertions.assertEquals("auto_import_view", loadedView.name());
+
+    // Verify view was auto-imported into entity store
+    GenericEntity viewEntity = entityStore.get(viewIdent, VIEW, 
GenericEntity.class);
+    Assertions.assertNotNull(viewEntity);
+    Assertions.assertEquals(viewIdent.name(), viewEntity.name());
+    Assertions.assertEquals(viewIdent.namespace(), viewEntity.namespace());
+    Assertions.assertEquals(Entity.EntityType.VIEW, viewEntity.type());
+  }
+
+  @Test
+  public void testLoadViewSkipsImportWhenAlreadyInEntityStore() throws 
IOException {
+    Namespace viewNs = Namespace.of(metalake, catalog, "schema64");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaOperationDispatcher.createSchema(NameIdentifier.of(viewNs.levels()), 
"comment", props);
+
+    NameIdentifier viewIdent = NameIdentifier.of(viewNs, 
"already_imported_view");
+
+    // Pre-populate entity store with view entity
+    GenericEntity viewEntity =
+        GenericEntity.builder()
+            .withId(1L)
+            .withName(viewIdent.name())
+            .withNamespace(viewIdent.namespace())
+            .withEntityType(Entity.EntityType.VIEW)
+            .build();
+    entityStore.put(viewEntity, false);
+
+    // Create a mock view through the catalog operations
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    View mockView = createMockView("already_imported_view", props, auditInfo);
+
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.views.put(viewIdent, mockView);
+
+    // Record the initial entity state
+    GenericEntity initialEntity = entityStore.get(viewIdent, VIEW, 
GenericEntity.class);
+    long initialId = initialEntity.id();
+
+    // Load view - should load from catalog but skip import since already in 
entity store
+    View loadedView = viewOperationDispatcher.loadView(viewIdent);
+    Assertions.assertEquals("already_imported_view", loadedView.name());
+
+    // Verify entity is still in store with same ID (no duplicate import)
+    // If import was called, a new entity would be created with a different ID
+    GenericEntity retrievedEntity = entityStore.get(viewIdent, VIEW, 
GenericEntity.class);
+    Assertions.assertNotNull(retrievedEntity);
+    Assertions.assertEquals(initialId, retrievedEntity.id(), "Entity ID should 
not change");
+    Assertions.assertEquals(1L, retrievedEntity.id(), "Entity ID should remain 
1L (no new import)");
+  }
+
+  @Test
+  public void testLoadViewAutoImportWithMultipleConcurrentLoads() throws 
Exception {
+    Namespace viewNs = Namespace.of(metalake, catalog, "schema66");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaOperationDispatcher.createSchema(NameIdentifier.of(viewNs.levels()), 
"comment", props);
+
+    NameIdentifier viewIdent = NameIdentifier.of(viewNs, "concurrent_view");
+
+    // Create a mock view through the catalog operations
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    View mockView = createMockView("concurrent_view", props, auditInfo);
+
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.views.put(viewIdent, mockView);
+
+    // Test concurrent loads with multiple threads
+    int threadCount = 10;
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+    CountDownLatch latch = new CountDownLatch(threadCount);
+    List<Future<View>> futures = new ArrayList<>();
+
+    try {
+      // Submit concurrent load tasks
+      for (int i = 0; i < threadCount; i++) {
+        Future<View> future =
+            executor.submit(
+                () -> {
+                  latch.countDown();
+                  latch.await(); // Wait for all threads to be ready
+                  return viewOperationDispatcher.loadView(viewIdent);
+                });
+        futures.add(future);
+      }
+
+      // Verify all concurrent loads succeeded and reference the same imported 
entity
+      // If import was called multiple times, entity IDs would differ across 
views
+      Long entityId = null;
+      for (Future<View> future : futures) {
+        View loadedView = future.get(5, TimeUnit.SECONDS);
+        Assertions.assertEquals("concurrent_view", loadedView.name());
+
+        EntityCombinedView combinedView = (EntityCombinedView) loadedView;
+        long currentEntityId = combinedView.viewFromGravitino().id();
+
+        if (entityId == null) {
+          entityId = currentEntityId;
+        } else {
+          Assertions.assertEquals(
+              entityId,
+              currentEntityId,
+              "All concurrent loads should reference the same entity (import 
called only once)");
+        }
+      }
+
+    } finally {
+      executor.shutdown();
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+  }
+
+  @Test
+  public void testLoadViewAfterManualDelete() throws IOException {
+    Namespace viewNs = Namespace.of(metalake, catalog, "schema67");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    schemaOperationDispatcher.createSchema(NameIdentifier.of(viewNs.levels()), 
"comment", props);
+
+    NameIdentifier viewIdent = NameIdentifier.of(viewNs, "deleted_view");
+
+    // Create a mock view through the catalog operations
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+    View mockView = createMockView("deleted_view", props, auditInfo);
+
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.views.put(viewIdent, mockView);
+
+    // Load view - should auto-import
+    View loadedView1 = viewOperationDispatcher.loadView(viewIdent);
+    Assertions.assertEquals("deleted_view", loadedView1.name());
+
+    // Manually delete from entity store
+    entityStore.delete(viewIdent, VIEW);
+
+    // Load view again - should re-import
+    View loadedView2 = viewOperationDispatcher.loadView(viewIdent);
+    Assertions.assertEquals("deleted_view", loadedView2.name());
+
+    // Verify view was re-imported
+    GenericEntity viewEntity = entityStore.get(viewIdent, VIEW, 
GenericEntity.class);
+    Assertions.assertNotNull(viewEntity);
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 553b803edb..47cdb9badd 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -113,12 +113,14 @@ public class RESTService implements 
GravitinoAuxiliaryService {
     }
     IcebergTableEventDispatcher icebergTableEventDispatcher =
         new IcebergTableEventDispatcher(icebergTableOperationDispatcher, 
eventBus, metalakeName);
-    IcebergViewOperationExecutor icebergViewOperationExecutor =
+    IcebergViewOperationDispatcher icebergViewOperationDispatcher =
         new IcebergViewOperationExecutor(icebergCatalogWrapperManager);
-    IcebergViewHookDispatcher icebergViewHookDispatcher =
-        new IcebergViewHookDispatcher(icebergViewOperationExecutor, 
metalakeName);
+    if (authorizationContext.isAuthorizationEnabled()) {
+      icebergViewOperationDispatcher =
+          new IcebergViewHookDispatcher(icebergViewOperationDispatcher, 
metalakeName);
+    }
     IcebergViewEventDispatcher icebergViewEventDispatcher =
-        new IcebergViewEventDispatcher(icebergViewHookDispatcher, eventBus, 
metalakeName);
+        new IcebergViewEventDispatcher(icebergViewOperationDispatcher, 
eventBus, metalakeName);
 
     IcebergNamespaceOperationDispatcher namespaceOperationDispatcher =
         new IcebergNamespaceOperationExecutor(icebergCatalogWrapperManager);
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
index b686346ce3..5501336af1 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergViewHookDispatcher.java
@@ -18,10 +18,16 @@
  */
 package org.apache.gravitino.iceberg.service.dispatcher;
 
+import java.io.IOException;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.catalog.ViewDispatcher;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
 import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.gravitino.meta.GenericEntity;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateViewRequest;
@@ -62,16 +68,14 @@ public class IcebergViewHookDispatcher implements 
IcebergViewOperationDispatcher
     // Then import it into Gravitino so Gravitino is aware of the view
     importView(context.catalogName(), namespace, createViewRequest.name());
 
-    // TODO(#9746): Enable view ownership once ViewMetaService is implemented
-    // Currently disabled because VIEW entity type is not supported in
-    // RelationalEntityStoreIdResolver
-    // IcebergOwnershipUtils.setViewOwner(
-    //     metalake,
-    //     context.catalogName(),
-    //     namespace,
-    //     createViewRequest.name(),
-    //     context.userName(),
-    //     GravitinoEnv.getInstance().ownerDispatcher());
+    // Set ownership for the newly created view
+    IcebergOwnershipUtils.setViewOwner(
+        metalake,
+        context.catalogName(),
+        namespace,
+        createViewRequest.name(),
+        context.userName(),
+        GravitinoEnv.getInstance().ownerDispatcher());
 
     return response;
   }
@@ -92,8 +96,40 @@ public class IcebergViewHookDispatcher implements 
IcebergViewOperationDispatcher
   @Override
   public void dropView(IcebergRequestContext context, TableIdentifier 
viewIdentifier) {
     dispatcher.dropView(context, viewIdentifier);
-    // Note: We don't remove from Gravitino here as that will be handled by 
entity storage
-    // in future work (issue #9746)
+
+    // Remove view from Gravitino entity store
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    try {
+      if (store != null) {
+        store.delete(
+            IcebergIdentifierUtils.toGravitinoTableIdentifier(
+                metalake, context.catalogName(), viewIdentifier),
+            Entity.EntityType.VIEW);
+        LOG.info(
+            "Successfully removed view from Gravitino entity store: 
{}.{}.{}.{}",
+            metalake,
+            context.catalogName(),
+            viewIdentifier.namespace(),
+            viewIdentifier.name());
+      }
+    } catch (NoSuchEntityException ignore) {
+      // Ignore if the view entity does not exist in the store
+      LOG.debug(
+          "View entity does not exist in store: {}.{}.{}.{}",
+          metalake,
+          context.catalogName(),
+          viewIdentifier.namespace(),
+          viewIdentifier.name());
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to delete view entity from store: {}.{}.{}.{}",
+          metalake,
+          context.catalogName(),
+          viewIdentifier.namespace(),
+          viewIdentifier.name(),
+          ioe);
+      throw new RuntimeException("Failed to delete view entity from store", 
ioe);
+    }
   }
 
   @Override
@@ -109,7 +145,41 @@ public class IcebergViewHookDispatcher implements 
IcebergViewOperationDispatcher
   @Override
   public void renameView(IcebergRequestContext context, RenameTableRequest 
renameViewRequest) {
     dispatcher.renameView(context, renameViewRequest);
-    // Note: Rename handling in Gravitino will be added with full view support 
(issue #9746)
+
+    // Update view in Gravitino entity store with new name
+    NameIdentifier sourceIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(
+            metalake, context.catalogName(), renameViewRequest.source());
+    NameIdentifier destIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(
+            metalake, context.catalogName(), renameViewRequest.destination());
+
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    try {
+      if (store != null) {
+        store.update(
+            sourceIdent,
+            GenericEntity.class,
+            Entity.EntityType.VIEW,
+            viewEntity ->
+                GenericEntity.builder()
+                    .withId(viewEntity.id())
+                    .withName(destIdent.name())
+                    .withNamespace(destIdent.namespace())
+                    .withEntityType(Entity.EntityType.VIEW)
+                    .build());
+        LOG.info(
+            "Successfully renamed view in Gravitino entity store from {} to 
{}",
+            sourceIdent,
+            destIdent);
+      }
+    } catch (NoSuchEntityException ignore) {
+      // Ignore if the source view entity does not exist in the store
+      LOG.debug("Source view entity does not exist in store: {}", sourceIdent);
+    } catch (IOException ioe) {
+      LOG.error("Failed to rename view entity in store from {} to {}", 
sourceIdent, destIdent, ioe);
+      throw new RuntimeException("Failed to rename view entity in store", ioe);
+    }
   }
 
   /**
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
new file mode 100644
index 0000000000..6e524f2e9b
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/dispatcher/TestIcebergViewHookDispatcher.java
@@ -0,0 +1,364 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.authorization.OwnerDispatcher;
+import org.apache.gravitino.catalog.ViewDispatcher;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.iceberg.common.utils.IcebergIdentifierUtils;
+import org.apache.gravitino.listener.api.event.IcebergRequestContext;
+import org.apache.gravitino.meta.GenericEntity;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CreateViewRequest;
+import org.apache.iceberg.rest.requests.ImmutableCreateViewRequest;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.view.ImmutableSQLViewRepresentation;
+import org.apache.iceberg.view.ImmutableViewVersion;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergViewHookDispatcher {
+
+  private static final String METALAKE = "test_metalake";
+  private static final String CATALOG = "test_catalog";
+  private static final String SCHEMA_NAME = "test_schema";
+  private static final String VIEW_NAME = "test_view";
+  private static final String USER = "test_user";
+
+  private static final Schema VIEW_SCHEMA =
+      new Schema(Types.NestedField.required(1, "test_field", 
Types.StringType.get()));
+
+  private IcebergViewHookDispatcher hookDispatcher;
+  private IcebergViewOperationDispatcher mockExecutor;
+  private EntityStore mockEntityStore;
+  private ViewDispatcher mockViewDispatcher;
+  private OwnerDispatcher mockOwnerDispatcher;
+  private IcebergRequestContext mockContext;
+  private GravitinoEnv gravitinoEnv;
+
+  @BeforeEach
+  public void setUp() {
+    mockExecutor = mock(IcebergViewOperationDispatcher.class);
+    mockEntityStore = mock(EntityStore.class);
+    mockViewDispatcher = mock(ViewDispatcher.class);
+    mockOwnerDispatcher = mock(OwnerDispatcher.class);
+    mockContext = mock(IcebergRequestContext.class);
+
+    when(mockContext.catalogName()).thenReturn(CATALOG);
+    when(mockContext.userName()).thenReturn(USER);
+
+    // Setup GravitinoEnv mock
+    gravitinoEnv = GravitinoEnv.getInstance();
+    try {
+      FieldUtils.writeField(gravitinoEnv, "entityStore", mockEntityStore, 
true);
+      FieldUtils.writeField(gravitinoEnv, "viewDispatcher", 
mockViewDispatcher, true);
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", 
mockOwnerDispatcher, true);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to setup test", e);
+    }
+
+    hookDispatcher = new IcebergViewHookDispatcher(mockExecutor, METALAKE);
+  }
+
+  @AfterEach
+  public void tearDown() {
+    try {
+      // Clean up GravitinoEnv
+      FieldUtils.writeField(gravitinoEnv, "entityStore", null, true);
+      FieldUtils.writeField(gravitinoEnv, "viewDispatcher", null, true);
+      FieldUtils.writeField(gravitinoEnv, "ownerDispatcher", null, true);
+    } catch (Exception e) {
+      // Ignore cleanup errors
+    }
+  }
+
+  @Test
+  public void testCreateViewImportsIntoEntityStore() throws Exception {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    CreateViewRequest createRequest =
+        ImmutableCreateViewRequest.builder()
+            .name(VIEW_NAME)
+            .schema(VIEW_SCHEMA)
+            .viewVersion(
+                ImmutableViewVersion.builder()
+                    .versionId(1)
+                    .timestampMillis(System.currentTimeMillis())
+                    .schemaId(1)
+                    .defaultNamespace(namespace)
+                    .addRepresentations(
+                        ImmutableSQLViewRepresentation.builder()
+                            .sql("SELECT * FROM test")
+                            .dialect("spark")
+                            .build())
+                    .build())
+            .build();
+
+    LoadViewResponse mockResponse = mock(LoadViewResponse.class);
+    when(mockExecutor.createView(mockContext, namespace, 
createRequest)).thenReturn(mockResponse);
+
+    LoadViewResponse response = hookDispatcher.createView(mockContext, 
namespace, createRequest);
+
+    // Verify view was created in underlying catalog
+    verify(mockExecutor, times(1)).createView(mockContext, namespace, 
createRequest);
+
+    // Verify view was imported into Gravitino
+    NameIdentifier expectedIdent = NameIdentifier.of(METALAKE, CATALOG, 
SCHEMA_NAME, VIEW_NAME);
+    verify(mockViewDispatcher, times(1)).loadView(eq(expectedIdent));
+
+    // Verify ownership was set
+    verify(mockOwnerDispatcher, times(1)).setOwner(any(), any(), eq(USER), 
any());
+
+    assertEquals(mockResponse, response);
+  }
+
+  @Test
+  public void testCreateViewHandlesImportFailureGracefully() throws Exception {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    CreateViewRequest createRequest =
+        ImmutableCreateViewRequest.builder()
+            .name(VIEW_NAME)
+            .schema(VIEW_SCHEMA)
+            .viewVersion(
+                ImmutableViewVersion.builder()
+                    .versionId(1)
+                    .timestampMillis(System.currentTimeMillis())
+                    .schemaId(1)
+                    .defaultNamespace(namespace)
+                    .addRepresentations(
+                        ImmutableSQLViewRepresentation.builder()
+                            .sql("SELECT * FROM test")
+                            .dialect("spark")
+                            .build())
+                    .build())
+            .build();
+
+    LoadViewResponse mockResponse = mock(LoadViewResponse.class);
+    when(mockExecutor.createView(mockContext, namespace, 
createRequest)).thenReturn(mockResponse);
+
+    // Simulate import failure
+    doThrow(new RuntimeException("Import failed"))
+        .when(mockViewDispatcher)
+        .loadView(any(NameIdentifier.class));
+
+    // Should not throw - import is best-effort
+    LoadViewResponse response = hookDispatcher.createView(mockContext, 
namespace, createRequest);
+
+    assertEquals(mockResponse, response);
+    verify(mockExecutor, times(1)).createView(mockContext, namespace, 
createRequest);
+  }
+
+  @Test
+  public void testDropViewRemovesFromEntityStore() throws Exception {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+
+    hookDispatcher.dropView(mockContext, viewIdent);
+
+    // Verify view was dropped from underlying catalog
+    verify(mockExecutor, times(1)).dropView(mockContext, viewIdent);
+
+    // Verify view was deleted from entity store
+    NameIdentifier expectedIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
viewIdent);
+    verify(mockEntityStore, times(1)).delete(eq(expectedIdent), 
eq(Entity.EntityType.VIEW));
+  }
+
+  @Test
+  public void testDropViewHandlesMissingEntity() throws Exception {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+
+    // Simulate entity not found in store
+    NameIdentifier expectedIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
viewIdent);
+    doThrow(new NoSuchEntityException("Entity not found"))
+        .when(mockEntityStore)
+        .delete(eq(expectedIdent), eq(Entity.EntityType.VIEW));
+
+    // Should not throw - missing entity is ignored
+    hookDispatcher.dropView(mockContext, viewIdent);
+
+    verify(mockExecutor, times(1)).dropView(mockContext, viewIdent);
+    verify(mockEntityStore, times(1)).delete(eq(expectedIdent), 
eq(Entity.EntityType.VIEW));
+  }
+
+  @Test
+  public void testDropViewHandlesIOException() throws Exception {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+
+    // Simulate IO error
+    NameIdentifier expectedIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
viewIdent);
+    doThrow(new IOException("IO error"))
+        .when(mockEntityStore)
+        .delete(eq(expectedIdent), eq(Entity.EntityType.VIEW));
+
+    // Should throw RuntimeException wrapping the IOException
+    RuntimeException exception =
+        assertThrows(RuntimeException.class, () -> 
hookDispatcher.dropView(mockContext, viewIdent));
+
+    assertEquals("Failed to delete view entity from store", 
exception.getMessage());
+    verify(mockExecutor, times(1)).dropView(mockContext, viewIdent);
+  }
+
+  @Test
+  public void testRenameViewUpdatesEntityStore() throws Exception {
+    TableIdentifier sourceIdent = 
TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view");
+    TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
"new_view");
+    RenameTableRequest renameRequest =
+        
RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build();
+
+    hookDispatcher.renameView(mockContext, renameRequest);
+
+    // Verify view was renamed in underlying catalog
+    verify(mockExecutor, times(1)).renameView(mockContext, renameRequest);
+
+    // Verify entity store was updated
+    NameIdentifier sourceGravitinoIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
sourceIdent);
+    verify(mockEntityStore, times(1))
+        .update(
+            eq(sourceGravitinoIdent), eq(GenericEntity.class), 
eq(Entity.EntityType.VIEW), any());
+  }
+
+  @Test
+  public void testRenameViewHandlesMissingEntity() throws Exception {
+    TableIdentifier sourceIdent = 
TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view");
+    TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
"new_view");
+    RenameTableRequest renameRequest =
+        
RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build();
+
+    // Simulate entity not found in store
+    NameIdentifier sourceGravitinoIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
sourceIdent);
+    when(mockEntityStore.update(
+            eq(sourceGravitinoIdent), eq(GenericEntity.class), 
eq(Entity.EntityType.VIEW), any()))
+        .thenThrow(new NoSuchEntityException("Entity not found"));
+
+    // Should not throw - missing entity is ignored
+    hookDispatcher.renameView(mockContext, renameRequest);
+
+    verify(mockExecutor, times(1)).renameView(mockContext, renameRequest);
+  }
+
+  @Test
+  public void testRenameViewHandlesIOException() throws Exception {
+    TableIdentifier sourceIdent = 
TableIdentifier.of(Namespace.of(SCHEMA_NAME), "old_view");
+    TableIdentifier destIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
"new_view");
+    RenameTableRequest renameRequest =
+        
RenameTableRequest.builder().withSource(sourceIdent).withDestination(destIdent).build();
+
+    // Simulate IO error
+    NameIdentifier sourceGravitinoIdent =
+        IcebergIdentifierUtils.toGravitinoTableIdentifier(METALAKE, CATALOG, 
sourceIdent);
+    when(mockEntityStore.update(
+            eq(sourceGravitinoIdent), eq(GenericEntity.class), 
eq(Entity.EntityType.VIEW), any()))
+        .thenThrow(new IOException("IO error"));
+
+    // Should throw RuntimeException wrapping the IOException
+    RuntimeException exception =
+        assertThrows(
+            RuntimeException.class, () -> 
hookDispatcher.renameView(mockContext, renameRequest));
+
+    assertEquals("Failed to rename view entity in store", 
exception.getMessage());
+    verify(mockExecutor, times(1)).renameView(mockContext, renameRequest);
+  }
+
+  @Test
+  public void testReplaceViewDelegatesWithoutModification() {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+    UpdateTableRequest replaceRequest = mock(UpdateTableRequest.class);
+    LoadViewResponse mockResponse = mock(LoadViewResponse.class);
+
+    when(mockExecutor.replaceView(mockContext, viewIdent, 
replaceRequest)).thenReturn(mockResponse);
+
+    LoadViewResponse response = hookDispatcher.replaceView(mockContext, 
viewIdent, replaceRequest);
+
+    verify(mockExecutor, times(1)).replaceView(mockContext, viewIdent, 
replaceRequest);
+    assertEquals(mockResponse, response);
+
+    // Verify no entity store operations were performed
+    try {
+      verify(mockEntityStore, never()).update(any(), any(), any(), any());
+      verify(mockEntityStore, never()).delete(any(), any());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testLoadViewDelegatesWithoutModification() {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+    LoadViewResponse mockResponse = mock(LoadViewResponse.class);
+
+    when(mockExecutor.loadView(mockContext, 
viewIdent)).thenReturn(mockResponse);
+
+    LoadViewResponse response = hookDispatcher.loadView(mockContext, 
viewIdent);
+
+    verify(mockExecutor, times(1)).loadView(mockContext, viewIdent);
+    assertEquals(mockResponse, response);
+  }
+
+  @Test
+  public void testListViewDelegatesWithoutModification() {
+    Namespace namespace = Namespace.of(SCHEMA_NAME);
+    ListTablesResponse mockResponse = mock(ListTablesResponse.class);
+
+    when(mockExecutor.listView(mockContext, 
namespace)).thenReturn(mockResponse);
+
+    ListTablesResponse response = hookDispatcher.listView(mockContext, 
namespace);
+
+    verify(mockExecutor, times(1)).listView(mockContext, namespace);
+    assertEquals(mockResponse, response);
+  }
+
+  @Test
+  public void testViewExistsDelegatesWithoutModification() {
+    TableIdentifier viewIdent = TableIdentifier.of(Namespace.of(SCHEMA_NAME), 
VIEW_NAME);
+
+    when(mockExecutor.viewExists(mockContext, viewIdent)).thenReturn(true);
+
+    boolean exists = hookDispatcher.viewExists(mockContext, viewIdent);
+
+    verify(mockExecutor, times(1)).viewExists(mockContext, viewIdent);
+    assertEquals(true, exists);
+  }
+}


Reply via email to