This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new eb3cc8cf64 [#10997] feat(catalog-hive): support view operations 
(#10998)
eb3cc8cf64 is described below

commit eb3cc8cf6459bac8af0bd69a7810d5beb088cd1f
Author: mchades <[email protected]>
AuthorDate: Tue May 19 16:42:04 2026 +0800

    [#10997] feat(catalog-hive): support view operations (#10998)
    
    ### What changes were proposed in this pull request?
    
    This PR adds and refines view operations for Hive catalog.
    
    - Implement and organize view CRUD logic in `HiveViewCatalogOperations`.
    - Keep `HiveCatalogOperations` as a delegating entry for view
    operations.
    - Return explicit unsupported errors for unsupported dialect paths.
    - Update unit/integration tests for Hive catalog view behavior.
    
    ### Why are the changes needed?
    
    Hive catalog needs complete view operation support, and the view logic
    should be isolated to keep the main operations class maintainable.
    
    Fix: #10997
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Hive catalog view operations are supported, and unsupported dialect
    paths return explicit errors.
    
    ### How was this patch tested?
    
    - `./gradlew :catalogs:catalog-hive:test --tests
    org.apache.gravitino.catalog.hive.TestHiveCatalogOperations --tests
    org.apache.gravitino.catalog.hive.integration.test.CatalogHiveViewIT
    :catalogs:hive-metastore-common:test --tests
    org.apache.gravitino.hive.converter.TestHiveTableConverter
    -PskipDockerTests=false --no-daemon`
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../catalog/hive/HiveCatalogOperations.java        | 144 +++-
 .../apache/gravitino/catalog/hive/HiveView.java    | 159 +++++
 .../catalog/hive/HiveViewCatalogOperations.java    | 460 +++++++++++++
 .../catalog/hive/TestHiveCatalogOperations.java    | 736 +++++++++++++++++++++
 .../gravitino/catalog/hive/TestHiveView.java       |  80 +++
 .../hive/integration/test/CatalogHiveViewIT.java   | 584 ++++++++++++++++
 .../java/org/apache/gravitino/hive/HiveTable.java  |  24 +
 .../apache/gravitino/hive/client/HiveClient.java   |   3 +
 .../gravitino/hive/client/HiveClientImpl.java      |   6 +
 .../org/apache/gravitino/hive/client/HiveShim.java |   3 +
 .../apache/gravitino/hive/client/HiveShimV2.java   |  11 +
 .../apache/gravitino/hive/client/HiveShimV3.java   |  20 +
 .../hive/converter/HiveTableConverter.java         |  60 +-
 .../apache/gravitino/hive/client/TestHive2HMS.java |   6 +
 .../hive/converter/TestHiveTableConverter.java     |  76 +++
 15 files changed, 2356 insertions(+), 16 deletions(-)

diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
index 42cac99c08..3f8c37d7b2 100644
--- 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
@@ -63,17 +63,23 @@ import 
org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NoSuchViewException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.exceptions.ViewAlreadyExistsException;
 import org.apache.gravitino.hive.CachedClientPool;
 import org.apache.gravitino.hive.HiveSchema;
 import org.apache.gravitino.hive.HiveTable;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Representation;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.View;
+import org.apache.gravitino.rel.ViewCatalog;
+import org.apache.gravitino.rel.ViewChange;
 import org.apache.gravitino.rel.expressions.Expression;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
@@ -88,7 +94,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Operations for interacting with an Apache Hive catalog in Apache 
Gravitino. */
-public class HiveCatalogOperations implements CatalogOperations, 
SupportsSchemas, TableCatalog {
+public class HiveCatalogOperations
+    implements CatalogOperations, SupportsSchemas, TableCatalog, ViewCatalog {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogOperations.class);
 
@@ -100,11 +107,13 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
   private HasPropertyMetadata propertiesMetadata;
 
   private String catalogName;
+  private HiveViewCatalogOperations viewCatalogOperations;
 
   private boolean listAllTables = true;
   // The maximum number of tables that can be returned by the 
listTableNamesByFilter function.
   // The default value is -1, which means that all tables are returned.
   private static final short MAX_TABLES = -1;
+  static final String ALL_TABLE_PATTERN = "*";
 
   // Map that maintains the mapping of keys in Gravitino to that in Hive, for 
example, users
   // will only need to set the configuration 'METASTORE_URL' in Gravitino and 
Gravitino will change
@@ -144,6 +153,8 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
                 .catalogPropertiesMetadata()
                 .getOrDefault(conf, 
HiveCatalogPropertiesMetadata.DEFAULT_CATALOG);
     this.catalogName = defaultCatalog;
+    this.viewCatalogOperations =
+        new HiveViewCatalogOperations(() -> clientPool, () -> catalogName, 
this::schemaExists);
   }
 
   @VisibleForTesting
@@ -376,11 +387,22 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
       // then based on
       // those names we can obtain metadata for each individual table and get 
the type we needed.
       List<String> allTables = clientPool.run(c -> c.getAllTables(catalogName, 
schemaIdent.name()));
+      // Always filter out VIRTUAL_VIEW entries so they don't appear in table 
listings
+      List<String> views =
+          clientPool.run(
+              c ->
+                  c.listTablesByType(
+                      catalogName,
+                      schemaIdent.name(),
+                      ALL_TABLE_PATTERN,
+                      TableType.VIRTUAL_VIEW.name()));
+      allTables.removeAll(views);
+
       if (!listAllTables) {
         // The reason for using the listTableNamesByFilter function is that the
         // getTableObjectiesByName function has poor performance. Currently, 
we focus on the
         // Iceberg, Paimon and Hudi table. In the future, if necessary, we 
will need to filter out
-        // other tables. In addition, the current return also includes tables 
of type VIRTUAL-VIEW.
+        // other tables.
         String icebergAndPaimonFilter = getIcebergAndPaimonFilter();
         List<String> icebergAndPaimonTables =
             clientPool.run(
@@ -398,6 +420,7 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
                         catalogName, schemaIdent.name(), hudiFilter, 
MAX_TABLES));
         removeHudiTables(allTables, hudiTables);
       }
+
       return allTables.stream()
           .map(tbName -> NameIdentifier.of(namespace, tbName))
           .toArray(NameIdentifier[]::new);
@@ -1009,6 +1032,123 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
     return clientPool;
   }
 
+  @VisibleForTesting
+  void setViewCatalogOperations(HiveViewCatalogOperations 
viewCatalogOperations) {
+    this.viewCatalogOperations = viewCatalogOperations;
+  }
+
+  // ==================== ViewCatalog implementation ====================
+
+  /**
+   * Lists all views in the given schema namespace.
+   *
+   * @param namespace The namespace of the schema.
+   * @return An array of view identifiers.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   */
+  @Override
+  public NameIdentifier[] listViews(Namespace namespace) throws 
NoSuchSchemaException {
+    return getViewCatalogOperations().listViews(namespace);
+  }
+
+  /**
+   * Loads a view from the Hive Metastore.
+   *
+   * @param ident The view identifier.
+   * @return The loaded view.
+   * @throws NoSuchViewException If the view does not exist.
+   */
+  @Override
+  public View loadView(NameIdentifier ident) throws NoSuchViewException {
+    return getViewCatalogOperations().loadView(ident);
+  }
+
+  /**
+   * Creates a view in the Hive Metastore.
+   *
+   * <p>Hive Metastore stores exactly one engine-native SQL dialect for each 
view. This
+   * implementation currently supports only {@code hive}.
+   *
+   * <p>TODO(design-docs/gravitino-logical-view-management.md): support {@code 
trino} and {@code
+   * spark} HMS view formats.
+   *
+   * @param ident The view identifier.
+   * @param comment An optional comment.
+   * @param columns The output columns of the view.
+   * @param representations The SQL representations (must contain exactly one 
{@code hive} dialect).
+   * @param defaultCatalog The default catalog used to resolve unqualified 
identifiers referenced by
+   *     the view definition. For {@code hive} dialect this must be {@code 
null}; non-null values
+   *     are rejected.
+   * @param defaultSchema The default schema used to resolve unqualified 
identifiers referenced by
+   *     the view definition. For {@code hive} dialect this must be {@code 
null}; non-null values
+   *     are rejected.
+   * @param properties Additional properties stored in HMS.
+   * @return The created view.
+   * @throws NoSuchSchemaException If the schema does not exist.
+   * @throws ViewAlreadyExistsException If the view already exists.
+   */
+  @Override
+  public View createView(
+      NameIdentifier ident,
+      String comment,
+      Column[] columns,
+      Representation[] representations,
+      String defaultCatalog,
+      String defaultSchema,
+      Map<String, String> properties)
+      throws NoSuchSchemaException, ViewAlreadyExistsException {
+    return getViewCatalogOperations()
+        .createView(
+            ident, comment, columns, representations, defaultCatalog, 
defaultSchema, properties);
+  }
+
+  /**
+   * Alters a view in the Hive Metastore.
+   *
+   * <p>Supported changes: rename, set/remove property, and replace the view 
definition.
+   *
+   * @param ident The view identifier.
+   * @param changes The changes to apply.
+   * @return The updated view.
+   * @throws NoSuchViewException If the view does not exist.
+   * @throws ViewAlreadyExistsException If a rename target already exists.
+   */
+  @Override
+  public View alterView(NameIdentifier ident, ViewChange... changes)
+      throws NoSuchViewException, ViewAlreadyExistsException {
+    return getViewCatalogOperations().alterView(ident, changes);
+  }
+
+  /**
+   * Drops a view from the Hive Metastore.
+   *
+   * @param ident The view identifier.
+   * @return {@code true} if the view was dropped, {@code false} if it did not 
exist.
+   */
+  @Override
+  public boolean dropView(NameIdentifier ident) {
+    return getViewCatalogOperations().dropView(ident);
+  }
+
+  /**
+   * Checks whether a view with the given identifier exists.
+   *
+   * @param ident The view identifier.
+   * @return {@code true} if the view exists, {@code false} otherwise.
+   */
+  @Override
+  public boolean viewExists(NameIdentifier ident) {
+    return getViewCatalogOperations().viewExists(ident);
+  }
+
+  private HiveViewCatalogOperations getViewCatalogOperations() {
+    if (viewCatalogOperations == null) {
+      viewCatalogOperations =
+          new HiveViewCatalogOperations(() -> clientPool, () -> catalogName, 
this::schemaExists);
+    }
+    return viewCatalogOperations;
+  }
+
   private boolean isExternalTable(NameIdentifier tableIdent) {
     HiveTableHandle hiveTable = loadHiveTable(tableIdent);
     return EXTERNAL_TABLE.name().equalsIgnoreCase(hiveTable.getTableType());
diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveView.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveView.java
new file mode 100644
index 0000000000..1e28045c55
--- /dev/null
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveView.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.annotation.Unstable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Dialects;
+import org.apache.gravitino.rel.Representation;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.apache.gravitino.rel.View;
+
+/**
+ * Represents a view stored in Hive Metastore (VIRTUAL_VIEW table type). The 
SQL dialect is detected
+ * from table properties: Trino views start with "/* Presto View:", Spark 
views carry {@code
+ * spark.sql.create.version} in their parameters, and all other views are 
treated as native Hive SQL
+ * views.
+ */
+@Unstable
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@Builder(setterPrefix = "with", builderClassName = "Builder")
+@EqualsAndHashCode
+@ToString
+public class HiveView implements View {
+
+  private static final String SPARK_VERSION_KEY = "spark.sql.create.version";
+  private static final String TRINO_VIEW_MARKER_KEY = "presto_view";
+  private static final String TRINO_VIEW_PREFIX = "/* Presto View:";
+
+  private String name;
+  private String comment;
+  private Column[] columns;
+  private String defaultCatalog;
+  private String defaultSchema;
+  private Map<String, String> properties;
+  private AuditInfo auditInfo;
+  private SQLRepresentation[] representations;
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String comment() {
+    return comment;
+  }
+
+  @Override
+  public Column[] columns() {
+    return columns == null ? new Column[0] : columns;
+  }
+
+  @Override
+  @Nullable
+  public String defaultCatalog() {
+    return defaultCatalog;
+  }
+
+  @Override
+  @Nullable
+  public String defaultSchema() {
+    return defaultSchema;
+  }
+
+  @Override
+  public Representation[] representations() {
+    return representations == null ? new SQLRepresentation[0] : 
representations;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public AuditInfo auditInfo() {
+    return auditInfo;
+  }
+
+  /**
+   * Detects the SQL dialect from HMS table properties and view text.
+   *
+   * @param viewOriginalText The original view text from HMS.
+   * @param parameters The HMS table parameters map.
+   * @return The detected dialect string: "trino", "spark", or "hive".
+   */
+  static String detectDialect(String viewOriginalText, Map<String, String> 
parameters) {
+    if (parameters != null && 
"true".equalsIgnoreCase(parameters.get(TRINO_VIEW_MARKER_KEY))) {
+      return Dialects.TRINO;
+    }
+    if (StringUtils.startsWith(viewOriginalText, TRINO_VIEW_PREFIX)) {
+      return Dialects.TRINO;
+    }
+    if (parameters != null && parameters.containsKey(SPARK_VERSION_KEY)) {
+      return Dialects.SPARK;
+    }
+    return Dialects.HIVE;
+  }
+
+  /** Builder for {@link HiveView}. */
+  public static class Builder {
+    /**
+     * Builds the {@link HiveView} instance.
+     *
+     * @return The constructed view.
+     */
+    public HiveView build() {
+      Preconditions.checkArgument(StringUtils.isNotEmpty(name), "View name is 
required");
+      Preconditions.checkArgument(
+          ArrayUtils.isNotEmpty(representations), "Representations must not be 
null or empty");
+      for (SQLRepresentation representation : representations) {
+        Preconditions.checkArgument(
+            representation != null, "Representations must not contain null 
elements");
+      }
+
+      Map<String, String> normalizedProperties =
+          properties == null ? new HashMap<>() : new HashMap<>(properties);
+      return new HiveView(
+          name,
+          comment,
+          columns,
+          defaultCatalog,
+          defaultSchema,
+          normalizedProperties,
+          auditInfo,
+          representations);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveViewCatalogOperations.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveViewCatalogOperations.java
new file mode 100644
index 0000000000..e35b265949
--- /dev/null
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveViewCatalogOperations.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive;
+
+import static 
org.apache.gravitino.catalog.hive.HiveCatalogOperations.ALL_TABLE_PATTERN;
+import static org.apache.gravitino.catalog.hive.HiveConstants.COMMENT;
+import static org.apache.gravitino.catalog.hive.HiveConstants.TABLE_TYPE;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.NoSuchViewException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.exceptions.ViewAlreadyExistsException;
+import org.apache.gravitino.hive.CachedClientPool;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Dialects;
+import org.apache.gravitino.rel.Representation;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.apache.gravitino.rel.View;
+import org.apache.gravitino.rel.ViewCatalog;
+import org.apache.gravitino.rel.ViewChange;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class HiveViewCatalogOperations implements ViewCatalog {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveViewCatalogOperations.class);
+
+  private final Supplier<CachedClientPool> clientPoolSupplier;
+  private final Supplier<String> catalogNameSupplier;
+  private final Predicate<NameIdentifier> schemaExistsChecker;
+
+  HiveViewCatalogOperations(
+      Supplier<CachedClientPool> clientPoolSupplier,
+      Supplier<String> catalogNameSupplier,
+      Predicate<NameIdentifier> schemaExistsChecker) {
+    this.clientPoolSupplier = clientPoolSupplier;
+    this.catalogNameSupplier = catalogNameSupplier;
+    this.schemaExistsChecker = schemaExistsChecker;
+  }
+
+  @Override
+  public NameIdentifier[] listViews(Namespace namespace) throws 
NoSuchSchemaException {
+    NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
+    if (!schemaExistsChecker.test(schemaIdent)) {
+      throw new NoSuchSchemaException("Schema %s does not exist", namespace);
+    }
+    try {
+      List<String> views =
+          clientPool()
+              .run(
+                  c ->
+                      c.listTablesByType(
+                          catalogName(),
+                          schemaIdent.name(),
+                          ALL_TABLE_PATTERN,
+                          TableType.VIRTUAL_VIEW.name()));
+      return views.stream()
+          .map(name -> NameIdentifier.of(namespace, name))
+          .toArray(NameIdentifier[]::new);
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to list Hive views in " + namespace, 
e);
+    }
+  }
+
+  @Override
+  public View loadView(NameIdentifier ident) throws NoSuchViewException {
+    return loadHiveView(ident);
+  }
+
+  @Override
+  public View createView(
+      NameIdentifier ident,
+      String comment,
+      Column[] columns,
+      Representation[] representations,
+      String defaultCatalog,
+      String defaultSchema,
+      Map<String, String> properties)
+      throws NoSuchSchemaException, ViewAlreadyExistsException {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    if (!schemaExistsChecker.test(schemaIdent)) {
+      throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent);
+    }
+    SQLRepresentation sqlRepresentation =
+        validateSQLRepresentation(representations, defaultCatalog, 
defaultSchema, ident);
+
+    try {
+      Map<String, String> params =
+          Maps.newHashMap(properties == null ? ImmutableMap.of() : properties);
+      params.put(TABLE_TYPE, TableType.VIRTUAL_VIEW.name());
+      String viewOriginalText = toHmsViewOriginalText(sqlRepresentation, 
ident);
+
+      HiveTable hiveTable =
+          HiveTable.builder()
+              .withName(ident.name())
+              .withComment(comment)
+              .withColumns(copyColumns(columns))
+              .withProperties(params)
+              .withAuditInfo(
+                  AuditInfo.builder()
+                      .withCreator(PrincipalUtils.getCurrentUserName())
+                      .withCreateTime(Instant.now())
+                      .build())
+              .withCatalogName(catalogName())
+              .withDatabaseName(schemaIdent.name())
+              .withViewOriginalText(viewOriginalText)
+              .build();
+
+      clientPool()
+          .run(
+              c -> {
+                c.createTable(hiveTable);
+                return null;
+              });
+
+      LOG.info("Created Hive view {} in Hive Metastore", ident.name());
+      return toHiveView(
+          ident,
+          hiveTable.comment(),
+          hiveTable.properties(),
+          hiveTable.viewOriginalText(),
+          hiveTable.columns(),
+          hiveTable.auditInfo());
+    } catch (TableAlreadyExistsException e) {
+      throw new ViewAlreadyExistsException(e, "View %s already exists in Hive 
Metastore", ident);
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to create Hive view " + ident, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create Hive view " + ident, e);
+    }
+  }
+
+  @Override
+  public View alterView(NameIdentifier ident, ViewChange... changes)
+      throws NoSuchViewException, ViewAlreadyExistsException {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+
+    try {
+      HiveTable currentHiveTable =
+          clientPool().run(c -> c.getTable(catalogName(), schemaIdent.name(), 
ident.name()));
+      if (!TableType.VIRTUAL_VIEW
+          .name()
+          .equalsIgnoreCase(currentHiveTable.properties().get(TABLE_TYPE))) {
+        throw new NoSuchViewException("No view named %s (it is a table, not a 
view)", ident.name());
+      }
+
+      String newViewName = currentHiveTable.name();
+      String updatedViewOriginalText = currentHiveTable.viewOriginalText();
+      Map<String, String> updatedProperties = 
Maps.newHashMap(currentHiveTable.properties());
+      Column[] updatedColumns = copyColumns(currentHiveTable.columns());
+      String updatedComment = currentHiveTable.comment();
+      updatedProperties.remove(COMMENT);
+
+      for (ViewChange change : changes) {
+        if (change instanceof ViewChange.RenameView) {
+          String renameTarget = ((ViewChange.RenameView) change).getNewName();
+          NameIdentifier targetIdent = NameIdentifier.of(ident.namespace(), 
renameTarget);
+          if (viewExists(targetIdent)) {
+            throw new ViewAlreadyExistsException(
+                "View %s already exists in Hive Metastore", targetIdent);
+          }
+          newViewName = renameTarget;
+        } else if (change instanceof ViewChange.SetProperty) {
+          ViewChange.SetProperty sp = (ViewChange.SetProperty) change;
+          if (COMMENT.equals(sp.getProperty())) {
+            updatedComment = sp.getValue();
+          } else {
+            updatedProperties.put(sp.getProperty(), sp.getValue());
+          }
+        } else if (change instanceof ViewChange.RemoveProperty) {
+          String property = ((ViewChange.RemoveProperty) change).getProperty();
+          if (COMMENT.equals(property)) {
+            updatedComment = null;
+          } else {
+            updatedProperties.remove(property);
+          }
+        } else if (change instanceof ViewChange.ReplaceView) {
+          ViewChange.ReplaceView replace = (ViewChange.ReplaceView) change;
+          SQLRepresentation sqlRepresentation =
+              validateSQLRepresentation(
+                  replace.getRepresentations(),
+                  replace.getDefaultCatalog(),
+                  replace.getDefaultSchema(),
+                  ident);
+          updatedColumns = copyColumns(replace.getColumns());
+          updatedComment = replace.getComment();
+          updatedViewOriginalText = toHmsViewOriginalText(sqlRepresentation, 
ident);
+        } else {
+          throw new IllegalArgumentException(
+              "Unsupported view change type: " + 
change.getClass().getSimpleName());
+        }
+      }
+
+      HiveTable updatedHiveTable =
+          buildAlteredHiveView(
+              currentHiveTable,
+              schemaIdent,
+              newViewName,
+              updatedComment,
+              updatedProperties,
+              updatedColumns,
+              updatedViewOriginalText);
+
+      final String originalName = ident.name();
+      clientPool()
+          .run(
+              c -> {
+                c.alterTable(catalogName(), schemaIdent.name(), originalName, 
updatedHiveTable);
+                return null;
+              });
+
+      LOG.info("Altered Hive view {} (now {})", ident.name(), newViewName);
+      NameIdentifier updatedIdent = NameIdentifier.of(ident.namespace(), 
newViewName);
+      return toHiveView(
+          updatedIdent,
+          updatedHiveTable.comment(),
+          updatedHiveTable.properties(),
+          updatedHiveTable.viewOriginalText(),
+          updatedHiveTable.columns(),
+          updatedHiveTable.auditInfo());
+    } catch (NoSuchTableException e) {
+      throw new NoSuchViewException(e, "View %s does not exist in Hive 
Metastore", ident);
+    } catch (TableAlreadyExistsException e) {
+      throw new ViewAlreadyExistsException(
+          e,
+          "View %s already exists in Hive Metastore",
+          NameIdentifier.of(ident.namespace(), 
extractRenameTargetName(ident.name(), changes)));
+    } catch (NoSuchViewException | ViewAlreadyExistsException | 
IllegalArgumentException e) {
+      throw e;
+    } catch (UnsupportedOperationException e) {
+      throw e;
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to alter Hive view " + ident, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to alter Hive view " + ident, e);
+    }
+  }
+
+  private HiveTable buildAlteredHiveView(
+      HiveTable currentHiveTable,
+      NameIdentifier schemaIdent,
+      String viewName,
+      String comment,
+      Map<String, String> properties,
+      Column[] columns,
+      String viewOriginalText) {
+    return HiveTable.builder()
+        .withName(viewName)
+        .withComment(comment)
+        .withColumns(copyColumns(columns))
+        .withProperties(properties)
+        .withAuditInfo(currentHiveTable.auditInfo())
+        .withCatalogName(catalogName())
+        .withDatabaseName(schemaIdent.name())
+        .withViewOriginalText(viewOriginalText)
+        .build();
+  }
+
+  @Override
+  public boolean dropView(NameIdentifier ident) {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    try {
+      HiveTable hiveTable =
+          clientPool().run(c -> c.getTable(catalogName(), schemaIdent.name(), 
ident.name()));
+      if 
(!TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(hiveTable.properties().get(TABLE_TYPE)))
 {
+        return false;
+      }
+
+      clientPool()
+          .run(
+              c -> {
+                c.dropTable(catalogName(), schemaIdent.name(), ident.name(), 
false, false);
+                return null;
+              });
+      LOG.info("Dropped Hive view {}", ident.name());
+      return true;
+    } catch (NoSuchTableException e) {
+      return false;
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to drop Hive view " + ident, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to drop Hive view " + ident, e);
+    }
+  }
+
+  @Override
+  public boolean viewExists(NameIdentifier ident) {
+    try {
+      loadHiveView(ident);
+      return true;
+    } catch (NoSuchViewException e) {
+      return false;
+    }
+  }
+
+  private HiveView loadHiveView(NameIdentifier ident) throws 
NoSuchViewException {
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    try {
+      HiveTable hiveTable =
+          clientPool().run(c -> c.getTable(catalogName(), schemaIdent.name(), 
ident.name()));
+      if 
(!TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(hiveTable.properties().get(TABLE_TYPE)))
 {
+        throw new NoSuchViewException("No view named %s (it is a table, not a 
view)", ident);
+      }
+
+      return toHiveView(
+          ident,
+          hiveTable.comment(),
+          hiveTable.properties(),
+          hiveTable.viewOriginalText(),
+          hiveTable.columns(),
+          hiveTable.auditInfo());
+
+    } catch (NoSuchViewException | UnsupportedOperationException e) {
+      throw e;
+    } catch (NoSuchTableException e) {
+      throw new NoSuchViewException(e, "View %s does not exist in Hive 
Metastore", ident);
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to load Hive view " + ident, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to load Hive view " + ident, e);
+    }
+  }
+
+  private HiveView toHiveView(
+      NameIdentifier ident,
+      String comment,
+      Map<String, String> properties,
+      String viewOriginalText,
+      Column[] columns,
+      AuditInfo auditInfo) {
+    Map<String, String> params =
+        Maps.newHashMap(properties != null ? properties : ImmutableMap.of());
+    String representationSql = viewOriginalText;
+    String detectedDialect = HiveView.detectDialect(representationSql, params);
+    if (!Dialects.HIVE.equalsIgnoreCase(detectedDialect)) {
+      // TODO(design-docs/gravitino-logical-view-management.md): support 
loading trino/spark HMS
+      // views.
+      throw new UnsupportedOperationException(
+          String.format(
+              "Hive catalog currently supports only '%s' view dialect, but 
found '%s' for view %s",
+              Dialects.HIVE, detectedDialect, ident));
+    }
+
+    SQLRepresentation rep =
+        SQLRepresentation.builder()
+            .withDialect(Dialects.HIVE)
+            .withSql(StringUtils.defaultString(representationSql))
+            .build();
+
+    return HiveView.builder()
+        .withName(ident.name())
+        .withComment(comment)
+        .withColumns(copyColumns(columns))
+        .withRepresentations(new SQLRepresentation[] {rep})
+        .withProperties(params)
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
+  private SQLRepresentation validateSQLRepresentation(
+      Representation[] representations,
+      String defaultCatalog,
+      String defaultSchema,
+      NameIdentifier ident) {
+    int representationCount = representations == null ? 0 : 
representations.length;
+    Representation firstRepresentation =
+        ArrayUtils.isEmpty(representations) ? null : representations[0];
+    Preconditions.checkArgument(
+        representationCount == 1 && firstRepresentation instanceof 
SQLRepresentation,
+        "Hive catalog requires exactly one SQL representation for view %s, but 
got %s"
+            + " representation(s), first representation type is %s",
+        ident,
+        representationCount,
+        firstRepresentation == null ? "null" : 
firstRepresentation.getClass().getSimpleName());
+
+    SQLRepresentation selected = (SQLRepresentation) firstRepresentation;
+    boolean isHiveDialect = Dialects.HIVE.equalsIgnoreCase(selected.dialect());
+    if (isHiveDialect) {
+      Preconditions.checkArgument(
+          defaultCatalog == null && defaultSchema == null,
+          "Hive dialect '%s' does not support non-null 
defaultCatalog/defaultSchema, but got "
+              + "defaultCatalog=%s, defaultSchema=%s for view %s",
+          Dialects.HIVE,
+          defaultCatalog,
+          defaultSchema,
+          ident);
+      return selected;
+    }
+    // TODO(design-docs/gravitino-logical-view-management.md): support 
creating trino/spark HMS
+    // views.
+    throw new UnsupportedOperationException(
+        String.format(
+            "Hive catalog currently supports only '%s' view dialect, but got 
'%s' for view %s",
+            Dialects.HIVE, selected.dialect(), ident));
+  }
+
+  private String toHmsViewOriginalText(SQLRepresentation representation, 
NameIdentifier ident) {
+    if (!Dialects.HIVE.equalsIgnoreCase(representation.dialect())) {
+      // TODO(design-docs/gravitino-logical-view-management.md): support 
serializing trino/spark HMS
+      // view definitions.
+      throw new UnsupportedOperationException(
+          String.format(
+              "Hive catalog currently supports only '%s' view dialect, but got 
'%s' for view %s",
+              Dialects.HIVE, representation.dialect(), ident));
+    }
+    return representation.sql();
+  }
+
+  private String extractRenameTargetName(String originalName, ViewChange[] 
changes) {
+    for (ViewChange change : changes) {
+      if (change instanceof ViewChange.RenameView) {
+        return ((ViewChange.RenameView) change).getNewName();
+      }
+    }
+    return originalName;
+  }
+
+  private Column[] copyColumns(Column[] columns) {
+    return columns == null ? new Column[0] : columns.clone();
+  }
+
+  private CachedClientPool clientPool() {
+    return clientPoolSupplier.get();
+  }
+
+  private String catalogName() {
+    return catalogNameSupplier.get();
+  }
+}
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
index f0a8debac2..c6c4b313a2 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
@@ -36,29 +36,45 @@ import static 
org.apache.gravitino.catalog.hive.HiveCatalogPropertiesMetadata.PR
 import static 
org.apache.gravitino.catalog.hive.TestHiveCatalog.HIVE_PROPERTIES_METADATA;
 import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyShort;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.exceptions.ConnectionFailedException;
+import org.apache.gravitino.exceptions.ViewAlreadyExistsException;
 import org.apache.gravitino.hive.CachedClientPool;
 import org.apache.gravitino.hive.HiveSchema;
 import org.apache.gravitino.hive.HiveTable;
 import org.apache.gravitino.hive.client.HiveClient;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Representation;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.apache.gravitino.rel.View;
+import org.apache.gravitino.rel.ViewChange;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.ClientPool;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.thrift.TException;
@@ -197,4 +213,724 @@ class TestHiveCatalogOperations {
     HiveTable createdTable = hiveTableCaptor.getValue();
     Assertions.assertEquals(0, createdTable.columns().length);
   }
+
+  @Test
+  void testCreateViewRejectsTrinoDialect() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () ->
+                op.createView(
+                    NameIdentifier.of("db", "v_trino"),
+                    null,
+                    new Column[0],
+                    new SQLRepresentation[] {
+                      
SQLRepresentation.builder().withDialect("trino").withSql("SELECT 1").build()
+                    },
+                    null,
+                    null,
+                    Maps.newHashMap()));
+    Assertions.assertTrue(exception.getMessage().contains("supports only 
'hive'"));
+  }
+
+  @Test
+  void testCreateViewPassesColumnsToHiveMetastore() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "id column"),
+      Column.of("name", Types.StringType.get(), "name column")
+    };
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+
+    ArgumentCaptor<HiveTable> hiveTableCaptor = 
ArgumentCaptor.forClass(HiveTable.class);
+    doNothing().when(hiveClient).createTable(hiveTableCaptor.capture());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    View created =
+        op.createView(
+            NameIdentifier.of("db", "v_hive"),
+            null,
+            columns,
+            new SQLRepresentation[] {
+              SQLRepresentation.builder()
+                  .withDialect("hive")
+                  .withSql("SELECT id, name FROM t")
+                  .build()
+            },
+            null,
+            null,
+            Maps.newHashMap());
+
+    Assertions.assertEquals(2, hiveTableCaptor.getValue().columns().length);
+    Assertions.assertEquals("id", 
hiveTableCaptor.getValue().columns()[0].name());
+    Assertions.assertEquals("name", 
hiveTableCaptor.getValue().columns()[1].name());
+    Assertions.assertEquals(
+        "SELECT id, name FROM t", 
hiveTableCaptor.getValue().viewOriginalText());
+    Assertions.assertEquals(2, created.columns().length);
+  }
+
+  @Test
+  void testCreateViewWithUppercaseHiveDialectUnderTurkishLocale() throws 
Exception {
+    Locale originalDefault = Locale.getDefault();
+    Locale.setDefault(Locale.forLanguageTag("tr-TR"));
+    try {
+      HiveCatalogOperations op = new HiveCatalogOperations();
+      op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+      CachedClientPool clientPool = mock(CachedClientPool.class);
+      HiveClient hiveClient = mock(HiveClient.class);
+      HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+      when(hiveClient.getDatabase(anyString(), 
anyString())).thenReturn(schema);
+      when(clientPool.run(any()))
+          .thenAnswer(
+              invocation -> {
+                ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+                return action.run(hiveClient);
+              });
+      op.clientPool = clientPool;
+
+      View created =
+          Assertions.assertDoesNotThrow(
+              () ->
+                  op.createView(
+                      NameIdentifier.of("db", "v_hive_upper"),
+                      null,
+                      new Column[0],
+                      new SQLRepresentation[] {
+                        
SQLRepresentation.builder().withDialect("HIVE").withSql("SELECT 1").build()
+                      },
+                      null,
+                      null,
+                      Maps.newHashMap()));
+
+      SQLRepresentation representation = (SQLRepresentation) 
created.representations()[0];
+      Assertions.assertEquals("SELECT 1", representation.sql());
+    } finally {
+      Locale.setDefault(originalDefault);
+    }
+  }
+
+  @Test
+  void testCreateViewRejectsNonNullDefaultCatalogAndSchema() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                op.createView(
+                    NameIdentifier.of("db", "v_hive"),
+                    null,
+                    new Column[0],
+                    new SQLRepresentation[] {
+                      
SQLRepresentation.builder().withDialect("hive").withSql("SELECT 1").build()
+                    },
+                    "analytics",
+                    "mart",
+                    Maps.newHashMap(ImmutableMap.of("created_by", "test"))));
+
+    Assertions.assertTrue(
+        exception.getMessage().contains("does not support non-null 
defaultCatalog/defaultSchema"));
+  }
+
+  @Test
+  void testCreateViewRejectsSparkDialect() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () ->
+                op.createView(
+                    NameIdentifier.of("db", "v_spark"),
+                    null,
+                    new Column[0],
+                    new SQLRepresentation[] {
+                      
SQLRepresentation.builder().withDialect("spark").withSql("SELECT 1").build()
+                    },
+                    null,
+                    null,
+                    Maps.newHashMap()));
+    Assertions.assertTrue(exception.getMessage().contains("supports only 
'hive'"));
+  }
+
+  @Test
+  void testCreateViewRejectsNonSqlRepresentation() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                op.createView(
+                    NameIdentifier.of("db", "v_non_sql"),
+                    null,
+                    new Column[0],
+                    new Representation[] {() -> "custom"},
+                    null,
+                    null,
+                    Maps.newHashMap()));
+
+    Assertions.assertTrue(exception.getMessage().contains("exactly one SQL 
representation"));
+  }
+
+  @Test
+  void testCreateViewRejectsMultipleRepresentations() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveSchema schema = 
HiveSchema.builder().withCatalogName("hive").withName("db").build();
+    when(hiveClient.getDatabase(anyString(), anyString())).thenReturn(schema);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                op.createView(
+                    NameIdentifier.of("db", "v_multi_reps"),
+                    null,
+                    new Column[0],
+                    new Representation[] {
+                      
SQLRepresentation.builder().withDialect("hive").withSql("SELECT 1").build(),
+                      
SQLRepresentation.builder().withDialect("hive").withSql("SELECT 2").build()
+                    },
+                    null,
+                    null,
+                    Maps.newHashMap()));
+
+    Assertions.assertTrue(exception.getMessage().contains("exactly one SQL 
representation"));
+  }
+
+  @Test
+  void testLoadViewRejectsTrinoDialect() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    when(hiveClient.getTable(anyString(), anyString(), anyString()))
+        .thenReturn(
+            HiveTable.builder()
+                .withName("v_trino")
+                .withCatalogName("hive")
+                .withDatabaseName("db")
+                .withColumns(new Column[0])
+                .withProperties(
+                    Maps.newHashMap(
+                        ImmutableMap.of(
+                            HiveConstants.TABLE_TYPE,
+                            TableType.VIRTUAL_VIEW.name(),
+                            "presto_view",
+                            "true")))
+                .withViewOriginalText("SELECT 1")
+                .build());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () -> op.loadView(NameIdentifier.of("db", "v_trino")));
+    Assertions.assertTrue(exception.getMessage().contains("supports only 
'hive'"));
+  }
+
+  @Test
+  void testLoadViewReturnsColumnsFromHiveMetastore() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "id column"),
+      Column.of("name", Types.StringType.get(), "name column")
+    };
+    when(hiveClient.getTable(anyString(), anyString(), anyString()))
+        .thenReturn(
+            HiveTable.builder()
+                .withName("v_hive")
+                .withCatalogName("hive")
+                .withDatabaseName("db")
+                .withColumns(columns)
+                .withProperties(
+                    Maps.newHashMap(
+                        ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+                .withViewOriginalText("SELECT id, name FROM t")
+                .build());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    View loaded = op.loadView(NameIdentifier.of("db", "v_hive"));
+
+    Assertions.assertEquals(2, loaded.columns().length);
+    Assertions.assertEquals("id", loaded.columns()[0].name());
+    Assertions.assertEquals("name", loaded.columns()[1].name());
+  }
+
+  @Test
+  void testLoadViewUsesOriginalText() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    when(hiveClient.getTable(anyString(), anyString(), anyString()))
+        .thenReturn(
+            HiveTable.builder()
+                .withName("v_hive")
+                .withCatalogName("hive")
+                .withDatabaseName("db")
+                .withColumns(new Column[0])
+                .withProperties(
+                    Maps.newHashMap(
+                        ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+                .withViewOriginalText("SELECT id, name FROM t")
+                .build());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    View loaded = op.loadView(NameIdentifier.of("db", "v_hive"));
+
+    SQLRepresentation representation = (SQLRepresentation) 
loaded.representations()[0];
+    Assertions.assertEquals("SELECT id, name FROM t", representation.sql());
+  }
+
+  @Test
+  void testAlterViewReplaceRejectsTrinoDialect() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveTable currentTable =
+        HiveTable.builder()
+            .withName("v_hive")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT 1")
+            .build();
+    when(hiveClient.getTable(anyString(), anyString(), 
anyString())).thenReturn(currentTable);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    UnsupportedOperationException exception =
+        Assertions.assertThrows(
+            UnsupportedOperationException.class,
+            () ->
+                op.alterView(
+                    NameIdentifier.of("db", "v_hive"),
+                    ViewChange.replaceView(
+                        new Column[0],
+                        new SQLRepresentation[] {
+                          SQLRepresentation.builder()
+                              .withDialect("trino")
+                              .withSql("SELECT 2")
+                              .build()
+                        },
+                        null,
+                        null,
+                        null)));
+    Assertions.assertTrue(exception.getMessage().contains("supports only 
'hive'"));
+  }
+
+  @Test
+  void testAlterViewReplacePassesReplacementColumnsToHiveMetastore() throws 
Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    Column[] currentColumns = {Column.of("id", Types.LongType.get(), "id 
column")};
+    Column[] replacementColumns = {
+      Column.of("id", Types.LongType.get(), "id column"),
+      Column.of("name", Types.StringType.get(), "name column")
+    };
+    HiveTable currentTable =
+        HiveTable.builder()
+            .withName("v_hive")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(currentColumns)
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT id FROM t")
+            .build();
+    when(hiveClient.getTable(anyString(), anyString(), 
anyString())).thenReturn(currentTable);
+
+    ArgumentCaptor<HiveTable> hiveTableCaptor = 
ArgumentCaptor.forClass(HiveTable.class);
+    doNothing()
+        .when(hiveClient)
+        .alterTable(anyString(), anyString(), anyString(), 
hiveTableCaptor.capture());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    View updated =
+        op.alterView(
+            NameIdentifier.of("db", "v_hive"),
+            ViewChange.replaceView(
+                replacementColumns,
+                new SQLRepresentation[] {
+                  SQLRepresentation.builder()
+                      .withDialect("hive")
+                      .withSql("SELECT id, name FROM t")
+                      .build()
+                },
+                null,
+                null,
+                "new comment"));
+
+    Assertions.assertEquals(2, hiveTableCaptor.getValue().columns().length);
+    Assertions.assertEquals("id", 
hiveTableCaptor.getValue().columns()[0].name());
+    Assertions.assertEquals("name", 
hiveTableCaptor.getValue().columns()[1].name());
+    Assertions.assertEquals(
+        "SELECT id, name FROM t", 
hiveTableCaptor.getValue().viewOriginalText());
+    Assertions.assertEquals("new comment", 
hiveTableCaptor.getValue().comment());
+    Assertions.assertEquals(2, updated.columns().length);
+    Assertions.assertEquals("new comment", updated.comment());
+  }
+
+  @Test
+  void testAlterViewRenameThrowsWhenTargetViewExists() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveTable sourceView =
+        HiveTable.builder()
+            .withName("v_source")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT 1")
+            .build();
+    HiveTable targetView =
+        HiveTable.builder()
+            .withName("v_target")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT 2")
+            .build();
+
+    when(hiveClient.getTable(anyString(), anyString(), anyString()))
+        .thenAnswer(
+            invocation -> {
+              String tableName = invocation.getArgument(2);
+              if ("v_source".equals(tableName)) {
+                return sourceView;
+              }
+              if ("v_target".equals(tableName)) {
+                return targetView;
+              }
+              throw new RuntimeException("NoSuchObjectException: unknown table 
" + tableName);
+            });
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    ViewAlreadyExistsException exception =
+        Assertions.assertThrows(
+            ViewAlreadyExistsException.class,
+            () -> op.alterView(NameIdentifier.of("db", "v_source"), 
ViewChange.rename("v_target")));
+    Assertions.assertTrue(exception.getMessage().contains("already exists"));
+  }
+
+  @Test
+  void testAlterViewReplaceRejectsNonNullDefaultCatalogAndSchema() throws 
Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveTable currentTable =
+        HiveTable.builder()
+            .withName("v_hive")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT 1")
+            .build();
+    when(hiveClient.getTable(anyString(), anyString(), 
anyString())).thenReturn(currentTable);
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    IllegalArgumentException exception =
+        Assertions.assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                op.alterView(
+                    NameIdentifier.of("db", "v_hive"),
+                    ViewChange.replaceView(
+                        new Column[0],
+                        new SQLRepresentation[] {
+                          SQLRepresentation.builder()
+                              .withDialect("hive")
+                              .withSql("SELECT 2")
+                              .build()
+                        },
+                        "analytics",
+                        "mart",
+                        null)));
+
+    Assertions.assertTrue(
+        exception.getMessage().contains("does not support non-null 
defaultCatalog/defaultSchema"));
+  }
+
+  @Test
+  void testDropViewThrowsForUnexpectedException() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveTable currentView =
+        HiveTable.builder()
+            .withName("v1")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withViewOriginalText("SELECT 1")
+            .build();
+    when(hiveClient.getTable(anyString(), anyString(), 
anyString())).thenReturn(currentView);
+    doAnswer(
+            invocation -> {
+              throw new RuntimeException("permission denied");
+            })
+        .when(hiveClient)
+        .dropTable(anyString(), anyString(), anyString(), anyBoolean(), 
anyBoolean());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    RuntimeException exception =
+        Assertions.assertThrows(
+            RuntimeException.class, () -> op.dropView(NameIdentifier.of("db", 
"v1")));
+    Assertions.assertTrue(exception.getMessage().contains("Failed to drop Hive 
view"));
+  }
+
+  @Test
+  void testListTablesFiltersViewsWithHiveTableTypeField() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(ImmutableMap.of(LIST_ALL_TABLES, "true"), null, 
HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    when(hiveClient.getAllDatabases(anyString())).thenReturn(List.of("db"));
+    when(hiveClient.getAllTables(anyString(), anyString()))
+        .thenReturn(new ArrayList<>(List.of("v1", "t1")));
+    when(hiveClient.listTablesByType(anyString(), anyString(), anyString(), 
anyString()))
+        .thenReturn(List.of("v1"));
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    NameIdentifier[] tables = op.listTables(Namespace.of("metalake", "hive", 
"db"));
+
+    Assertions.assertEquals(1, tables.length);
+    Assertions.assertEquals("t1", tables[0].name());
+    verify(hiveClient).getAllTables(eq("hive"), eq("db"));
+    verify(hiveClient)
+        .listTablesByType(eq("hive"), eq("db"), eq("*"), 
eq(TableType.VIRTUAL_VIEW.name()));
+    verify(hiveClient, never())
+        .listTableNamesByFilter(anyString(), anyString(), anyString(), 
anyShort());
+  }
+
+  @Test
+  void testListViewsUsesTypeListing() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(ImmutableMap.of(LIST_ALL_TABLES, "true"), null, 
HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    when(hiveClient.getAllDatabases(anyString())).thenReturn(List.of("db"));
+    when(hiveClient.listTablesByType(anyString(), anyString(), anyString(), 
anyString()))
+        .thenReturn(List.of("v1", "v2"));
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    NameIdentifier[] views = op.listViews(Namespace.of("metalake", "hive", 
"db"));
+
+    Assertions.assertEquals(2, views.length);
+    verify(hiveClient)
+        .listTablesByType(eq("hive"), eq("db"), eq("*"), 
eq(TableType.VIRTUAL_VIEW.name()));
+    verify(hiveClient, never()).getAllTables(anyString(), anyString());
+    verify(hiveClient, never()).getTable(anyString(), anyString(), 
anyString());
+    verify(hiveClient, never())
+        .listTableNamesByFilter(anyString(), anyString(), anyString(), 
anyShort());
+  }
+
+  @Test
+  void testDropViewReturnsFalseWhenTargetIsTable() throws Exception {
+    HiveCatalogOperations op = new HiveCatalogOperations();
+    op.initialize(Maps.newHashMap(), null, HIVE_PROPERTIES_METADATA);
+
+    CachedClientPool clientPool = mock(CachedClientPool.class);
+    HiveClient hiveClient = mock(HiveClient.class);
+    HiveTable table =
+        HiveTable.builder()
+            .withName("t1")
+            .withCatalogName("hive")
+            .withDatabaseName("db")
+            .withColumns(new Column[0])
+            .withProperties(
+                Maps.newHashMap(
+                    ImmutableMap.of(HiveConstants.TABLE_TYPE, 
TableType.MANAGED_TABLE.name())))
+            .build();
+    when(hiveClient.getTable(anyString(), anyString(), 
anyString())).thenReturn(table);
+    doAnswer(
+            invocation -> {
+              throw new AssertionError("dropTable should not be called for 
non-view objects");
+            })
+        .when(hiveClient)
+        .dropTable(anyString(), anyString(), anyString(), anyBoolean(), 
anyBoolean());
+    when(clientPool.run(any()))
+        .thenAnswer(
+            invocation -> {
+              ClientPool.Action<?, HiveClient, ?> action = 
invocation.getArgument(0);
+              return action.run(hiveClient);
+            });
+    op.clientPool = clientPool;
+
+    boolean dropped = op.dropView(NameIdentifier.of("db", "t1"));
+    Assertions.assertFalse(dropped);
+  }
 }
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveView.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveView.java
new file mode 100644
index 0000000000..1b6a6af725
--- /dev/null
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveView.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.rel.Dialects;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.junit.jupiter.api.Test;
+
+class TestHiveView {
+
+  private static SQLRepresentation[] hiveRepresentations() {
+    return new SQLRepresentation[] {
+      SQLRepresentation.builder().withDialect(Dialects.HIVE).withSql("select 
1").build()
+    };
+  }
+
+  @Test
+  void testBuildWithoutPropertiesReturnsEmptyMap() {
+    HiveView view =
+        
HiveView.builder().withName("v1").withRepresentations(hiveRepresentations()).build();
+
+    assertNotNull(view.properties());
+    assertTrue(view.properties().isEmpty());
+  }
+
+  @Test
+  void testBuildCopiesPropertiesMap() {
+    Map<String, String> sourceProperties = new HashMap<>();
+    sourceProperties.put("k1", "v1");
+
+    HiveView view =
+        HiveView.builder()
+            .withName("v1")
+            .withRepresentations(hiveRepresentations())
+            .withProperties(sourceProperties)
+            .build();
+    sourceProperties.put("k2", "v2");
+
+    assertEquals(1, view.properties().size());
+    assertEquals("v1", view.properties().get("k1"));
+  }
+
+  @Test
+  void testBuildWithoutValidNameThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
HiveView.builder().withRepresentations(hiveRepresentations()).build());
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
HiveView.builder().withName("").withRepresentations(hiveRepresentations()).build());
+  }
+
+  @Test
+  void testBuildWithoutRepresentationsThrows() {
+    assertThrows(IllegalArgumentException.class, () -> 
HiveView.builder().withName("v1").build());
+  }
+}
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveViewIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveViewIT.java
new file mode 100644
index 0000000000..630c2e472b
--- /dev/null
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveViewIT.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.hive.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchViewException;
+import org.apache.gravitino.exceptions.ViewAlreadyExistsException;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SQLRepresentation;
+import org.apache.gravitino.rel.View;
+import org.apache.gravitino.rel.ViewCatalog;
+import org.apache.gravitino.rel.ViewChange;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.Container.ExecResult;
+
+/** Integration tests for Hive catalog view CRUD operations. */
+@Tag("gravitino-docker-test")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CatalogHiveViewIT extends BaseIT {
+
+  private static final String PROVIDER = "hive";
+  private static final String VIEW_COMMENT = "hive view comment";
+  private static final String HIVE_DIALECT = "hive";
+
+  private final ContainerSuite containerSuite = ContainerSuite.getInstance();
+  private String metalakeName;
+  private String catalogName;
+  private String schemaName;
+
+  private GravitinoMetalake metalake;
+  private Catalog catalog;
+  private ViewCatalog viewCatalog;
+
+  @BeforeAll
+  public void setup() throws Exception {
+    super.startIntegrationTest();
+    containerSuite.startHiveContainer(
+        ImmutableMap.of(HiveContainer.HIVE_RUNTIME_VERSION, 
HiveContainer.HIVE2));
+
+    metalakeName = GravitinoITUtils.genRandomName("hive_view_it_metalake");
+    catalogName = GravitinoITUtils.genRandomName("hive_view_it_catalog");
+    schemaName = GravitinoITUtils.genRandomName("hive_view_it_schema");
+
+    client.createMetalake(metalakeName, "comment", Collections.emptyMap());
+    metalake = client.loadMetalake(metalakeName);
+
+    String hmsUri =
+        String.format(
+            "thrift://%s:%d",
+            containerSuite.getHiveContainer().getContainerIpAddress(),
+            HiveContainer.HIVE_METASTORE_PORT);
+
+    Map<String, String> catalogProperties = Maps.newHashMap();
+    catalogProperties.put(HiveConstants.METASTORE_URIS, hmsUri);
+
+    catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.RELATIONAL, PROVIDER, "comment", 
catalogProperties);
+    viewCatalog = catalog.asViewCatalog();
+    catalog.asSchemas().createSchema(schemaName, "schema comment", 
Maps.newHashMap());
+  }
+
+  @AfterEach
+  public void cleanViews() {
+    NameIdentifier[] views = viewCatalog.listViews(Namespace.of(schemaName));
+    for (NameIdentifier view : views) {
+      viewCatalog.dropView(view);
+    }
+  }
+
+  @AfterAll
+  public void teardown() throws Exception {
+    try {
+      catalog.asSchemas().dropSchema(schemaName, true);
+      metalake.disableCatalog(catalogName);
+      metalake.dropCatalog(catalogName, true);
+      client.disableMetalake(metalakeName);
+      client.dropMetalake(metalakeName);
+    } finally {
+      super.stopIntegrationTest();
+    }
+  }
+
+  // Suppress parent-class lifecycle hooks to control lifecycle ourselves.
+  @Override
+  @BeforeAll
+  public void startIntegrationTest() {}
+
+  @Override
+  @AfterAll
+  public void stopIntegrationTest() {}
+
+  @Test
+  public void testCreateAndLoadView() {
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "id column"),
+      Column.of("name", Types.StringType.get(), "name column")
+    };
+    String viewName = GravitinoITUtils.genRandomName("hive_test_view");
+    View created =
+        viewCatalog.createView(
+            NameIdentifier.of(schemaName, viewName),
+            VIEW_COMMENT,
+            columns,
+            new SQLRepresentation[] {hiveRep("SELECT id, name FROM 
some_table")},
+            null,
+            null,
+            Collections.singletonMap("created_by", "test"));
+
+    Assertions.assertEquals(viewName, created.name());
+    Assertions.assertEquals(VIEW_COMMENT, created.comment());
+    Assertions.assertEquals(1, created.representations().length);
+    Assertions.assertInstanceOf(SQLRepresentation.class, 
created.representations()[0]);
+    Assertions.assertEquals(
+        "SELECT id, name FROM some_table",
+        ((SQLRepresentation) created.representations()[0]).sql());
+    Assertions.assertEquals(
+        HIVE_DIALECT, ((SQLRepresentation) 
created.representations()[0]).dialect());
+
+    View loaded = viewCatalog.loadView(NameIdentifier.of(schemaName, 
viewName));
+    Assertions.assertEquals(viewName, loaded.name());
+    Assertions.assertEquals(VIEW_COMMENT, loaded.comment());
+    Assertions.assertEquals(1, loaded.representations().length);
+    Assertions.assertEquals("test", loaded.properties().get("created_by"));
+  }
+
+  @Test
+  public void testListViews() {
+    String view1 = GravitinoITUtils.genRandomName("hive_list_view1");
+    String view2 = GravitinoITUtils.genRandomName("hive_list_view2");
+    Column[] columns = {Column.of("c1", Types.StringType.get(), null)};
+
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, view1),
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT c1 FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, view2),
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT c1 FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    NameIdentifier[] views = viewCatalog.listViews(Namespace.of(schemaName));
+    Assertions.assertTrue(views.length >= 2);
+    boolean foundView1 = false, foundView2 = false;
+    for (NameIdentifier v : views) {
+      if (v.name().equals(view1)) foundView1 = true;
+      if (v.name().equals(view2)) foundView2 = true;
+    }
+    Assertions.assertTrue(foundView1, "view1 not found in list");
+    Assertions.assertTrue(foundView2, "view2 not found in list");
+  }
+
+  @Test
+  public void testListViewsInNonExistentSchema() {
+    Assertions.assertThrows(
+        NoSuchSchemaException.class,
+        () -> viewCatalog.listViews(Namespace.of("non_existent_schema_xyz")));
+  }
+
+  @Test
+  public void testViewExists() {
+    String viewName = GravitinoITUtils.genRandomName("hive_exists_view");
+    NameIdentifier ident = NameIdentifier.of(schemaName, viewName);
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    Assertions.assertFalse(viewCatalog.viewExists(ident));
+    viewCatalog.createView(
+        ident,
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+    Assertions.assertTrue(viewCatalog.viewExists(ident));
+  }
+
+  @Test
+  public void testDropView() {
+    String viewName = GravitinoITUtils.genRandomName("hive_drop_view");
+    NameIdentifier ident = NameIdentifier.of(schemaName, viewName);
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    viewCatalog.createView(
+        ident,
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+    Assertions.assertTrue(viewCatalog.viewExists(ident));
+
+    Assertions.assertTrue(viewCatalog.dropView(ident));
+    Assertions.assertFalse(viewCatalog.viewExists(ident));
+
+    // Dropping a non-existent view should return false
+    Assertions.assertFalse(viewCatalog.dropView(ident));
+  }
+
+  @Test
+  public void testAlterViewRename() {
+    String viewName = GravitinoITUtils.genRandomName("hive_rename_view");
+    String newName = GravitinoITUtils.genRandomName("hive_renamed_view");
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, viewName),
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    View renamed =
+        viewCatalog.alterView(NameIdentifier.of(schemaName, viewName), 
ViewChange.rename(newName));
+
+    Assertions.assertEquals(newName, renamed.name());
+    
Assertions.assertFalse(viewCatalog.viewExists(NameIdentifier.of(schemaName, 
viewName)));
+    Assertions.assertTrue(viewCatalog.viewExists(NameIdentifier.of(schemaName, 
newName)));
+  }
+
+  @Test
+  public void testAlterViewSetAndRemoveProperty() {
+    String viewName = GravitinoITUtils.genRandomName("hive_prop_view");
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, viewName),
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    View withProp =
+        viewCatalog.alterView(
+            NameIdentifier.of(schemaName, viewName), 
ViewChange.setProperty("my_key", "my_value"));
+    Assertions.assertEquals("my_value", withProp.properties().get("my_key"));
+
+    View withoutProp =
+        viewCatalog.alterView(
+            NameIdentifier.of(schemaName, viewName), 
ViewChange.removeProperty("my_key"));
+    Assertions.assertNull(withoutProp.properties().get("my_key"));
+  }
+
+  @Test
+  public void testCreateViewAlreadyExists() {
+    String viewName = GravitinoITUtils.genRandomName("hive_dup_view");
+    NameIdentifier ident = NameIdentifier.of(schemaName, viewName);
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    viewCatalog.createView(
+        ident,
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    Assertions.assertThrows(
+        ViewAlreadyExistsException.class,
+        () ->
+            viewCatalog.createView(
+                ident,
+                null,
+                columns,
+                new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+                null,
+                null,
+                Collections.emptyMap()));
+  }
+
+  @Test
+  public void testAlterViewReplace() {
+    String viewName = GravitinoITUtils.genRandomName("hive_replace_view");
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, viewName),
+        "original comment",
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    String updatedSql = "SELECT id FROM t WHERE id > 10";
+    View updated =
+        viewCatalog.alterView(
+            NameIdentifier.of(schemaName, viewName),
+            ViewChange.replaceView(
+                columns,
+                new SQLRepresentation[] {hiveRep(updatedSql)},
+                null,
+                null,
+                "updated comment"));
+
+    Assertions.assertEquals("updated comment", updated.comment());
+    Assertions.assertEquals(1, updated.representations().length);
+    Assertions.assertInstanceOf(SQLRepresentation.class, 
updated.representations()[0]);
+    Assertions.assertEquals(updatedSql, ((SQLRepresentation) 
updated.representations()[0]).sql());
+  }
+
+  @Test
+  public void testLoadNonExistentView() {
+    Assertions.assertThrows(
+        NoSuchViewException.class,
+        () -> viewCatalog.loadView(NameIdentifier.of(schemaName, 
"non_existent_view_xyz")));
+  }
+
+  @Test
+  public void testListViewsDoesNotIncludeTables() {
+    String tableName = GravitinoITUtils.genRandomName("hive_not_a_view_table");
+    String viewName = GravitinoITUtils.genRandomName("hive_actual_view");
+    Column[] columns = {Column.of("id", Types.LongType.get(), null)};
+
+    // Create a regular table
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, tableName),
+            columns,
+            "a regular table",
+            Collections.emptyMap());
+
+    // Create a view
+    viewCatalog.createView(
+        NameIdentifier.of(schemaName, viewName),
+        null,
+        columns,
+        new SQLRepresentation[] {hiveRep("SELECT id FROM t")},
+        null,
+        null,
+        Collections.emptyMap());
+
+    NameIdentifier[] views = viewCatalog.listViews(Namespace.of(schemaName));
+    for (NameIdentifier v : views) {
+      Assertions.assertNotEquals(
+          tableName, v.name(), "listViews should not include regular tables");
+    }
+    boolean found = false;
+    for (NameIdentifier v : views) {
+      if (v.name().equals(viewName)) found = true;
+    }
+    Assertions.assertTrue(found, "actual view not found in listViews");
+
+    // Clean up table
+    catalog.asTableCatalog().dropTable(NameIdentifier.of(schemaName, 
tableName));
+  }
+
+  @Test
+  public void testLoadViewCreatedByHiveCli() {
+    String tableName = 
GravitinoITUtils.genRandomName("hive_client_source_table");
+    String viewName = 
GravitinoITUtils.genRandomName("hive_client_created_view");
+    String qualifiedTableName = qualifyHiveObject(tableName);
+    String qualifiedViewName = qualifyHiveObject(viewName);
+
+    try {
+      executeHiveSql(String.format("CREATE TABLE %s (id BIGINT, name STRING)", 
qualifiedTableName));
+      executeHiveSql(
+          String.format(
+              "CREATE VIEW %s COMMENT '%s' AS SELECT id, name FROM %s",
+              qualifiedViewName, VIEW_COMMENT, qualifiedTableName));
+
+      View loaded = viewCatalog.loadView(NameIdentifier.of(schemaName, 
viewName));
+      Assertions.assertEquals(viewName, loaded.name());
+      Assertions.assertEquals(VIEW_COMMENT, loaded.comment());
+      Assertions.assertEquals(1, loaded.representations().length);
+      Assertions.assertInstanceOf(SQLRepresentation.class, 
loaded.representations()[0]);
+
+      SQLRepresentation representation = (SQLRepresentation) 
loaded.representations()[0];
+      Assertions.assertEquals(HIVE_DIALECT, representation.dialect());
+      String normalizedSql = representation.sql().toLowerCase(Locale.ROOT);
+      Assertions.assertTrue(normalizedSql.contains("select id, name"));
+      
Assertions.assertTrue(normalizedSql.contains(tableName.toLowerCase(Locale.ROOT)));
+
+      NameIdentifier[] views = viewCatalog.listViews(Namespace.of(schemaName));
+      boolean found = false;
+      for (NameIdentifier view : views) {
+        if (view.name().equals(viewName)) {
+          found = true;
+          break;
+        }
+      }
+      Assertions.assertTrue(found, "view created by Hive CLI not found in 
listViews");
+    } finally {
+      executeHiveSql(String.format("DROP VIEW IF EXISTS %s", 
qualifiedViewName));
+      executeHiveSql(String.format("DROP TABLE IF EXISTS %s", 
qualifiedTableName));
+    }
+  }
+
+  @Test
+  public void testLoadViewCreatedByHiveCliReturnsColumns() {
+    String tableName = 
GravitinoITUtils.genRandomName("hive_client_cols_source");
+    String viewName = GravitinoITUtils.genRandomName("hive_client_cols_view");
+    String qualifiedTableName = qualifyHiveObject(tableName);
+    String qualifiedViewName = qualifyHiveObject(viewName);
+
+    try {
+      executeHiveSql(String.format("CREATE TABLE %s (id BIGINT, name STRING)", 
qualifiedTableName));
+      executeHiveSql(
+          String.format(
+              "CREATE VIEW %s AS SELECT id, name FROM %s", qualifiedViewName, 
qualifiedTableName));
+
+      View loaded = viewCatalog.loadView(NameIdentifier.of(schemaName, 
viewName));
+
+      Assertions.assertEquals(2, loaded.columns().length);
+      Assertions.assertEquals("id", loaded.columns()[0].name());
+      Assertions.assertEquals("name", loaded.columns()[1].name());
+    } finally {
+      executeHiveSql(String.format("DROP VIEW IF EXISTS %s", 
qualifiedViewName));
+      executeHiveSql(String.format("DROP TABLE IF EXISTS %s", 
qualifiedTableName));
+    }
+  }
+
+  @Test
+  public void testNativeHiveCreatedViewMetadataInHms() throws Exception {
+    String tableName = 
GravitinoITUtils.genRandomName("hive_native_meta_source");
+    String viewName = GravitinoITUtils.genRandomName("hive_native_meta_view");
+    String qualifiedTableName = qualifyHiveObject(tableName);
+    String qualifiedViewName = qualifyHiveObject(viewName);
+
+    try {
+      executeHiveSql(String.format("CREATE TABLE %s (id BIGINT, name STRING)", 
qualifiedTableName));
+      executeHiveSql(
+          String.format(
+              "CREATE VIEW %s COMMENT '%s' AS SELECT id, name FROM %s",
+              qualifiedViewName, VIEW_COMMENT, qualifiedTableName));
+
+      String describedMetadata = 
describeHiveObject(qualifiedViewName).toLowerCase(Locale.ROOT);
+      String normalizedMetadata = describedMetadata.replaceAll("\\s+", " 
").trim();
+
+      Assertions.assertTrue(normalizedMetadata.contains("table type: 
virtual_view"));
+      Assertions.assertTrue(normalizedMetadata.contains("view original 
text:"));
+      Assertions.assertTrue(normalizedMetadata.contains("view expanded 
text:"));
+      Assertions.assertTrue(normalizedMetadata.contains("hive view comment"));
+      Assertions.assertTrue(normalizedMetadata.contains("id bigint"));
+      Assertions.assertTrue(normalizedMetadata.contains("name string"));
+    } finally {
+      executeHiveSql(String.format("DROP VIEW IF EXISTS %s", 
qualifiedViewName));
+      executeHiveSql(String.format("DROP TABLE IF EXISTS %s", 
qualifiedTableName));
+    }
+  }
+
+  @Test
+  public void testHiveCliCanReadViewCreatedByGravitino() {
+    String tableName = 
GravitinoITUtils.genRandomName("hive_client_read_source");
+    String viewName = GravitinoITUtils.genRandomName("hive_client_read_view");
+    String qualifiedTableName = qualifyHiveObject(tableName);
+    String qualifiedViewName = qualifyHiveObject(viewName);
+    Column[] columns = {
+      Column.of("id", Types.LongType.get(), "id column"),
+      Column.of("name", Types.StringType.get(), "name column")
+    };
+
+    try {
+      executeHiveSql(String.format("CREATE TABLE %s (id BIGINT, name STRING)", 
qualifiedTableName));
+      executeHiveSql(String.format("INSERT INTO TABLE %s VALUES (1, 'alice')", 
qualifiedTableName));
+      executeHiveSql(String.format("INSERT INTO TABLE %s VALUES (2, 'bob')", 
qualifiedTableName));
+
+      viewCatalog.createView(
+          NameIdentifier.of(schemaName, viewName),
+          VIEW_COMMENT,
+          columns,
+          new SQLRepresentation[] {
+            hiveRep(String.format("SELECT id, name FROM %s", 
qualifiedTableName))
+          },
+          null,
+          null,
+          Collections.emptyMap());
+
+      long rowCount = querySingleLong(String.format("SELECT COUNT(*) FROM %s", 
qualifiedViewName));
+      Assertions.assertEquals(2L, rowCount);
+    } finally {
+      executeHiveSql(String.format("DROP VIEW IF EXISTS %s", 
qualifiedViewName));
+      executeHiveSql(String.format("DROP TABLE IF EXISTS %s", 
qualifiedTableName));
+    }
+  }
+
+  private SQLRepresentation hiveRep(String sql) {
+    return 
SQLRepresentation.builder().withDialect(HIVE_DIALECT).withSql(sql).build();
+  }
+
+  private String qualifyHiveObject(String objectName) {
+    return String.format("%s.%s", schemaName, objectName);
+  }
+
+  private void executeHiveSql(String sql) {
+    ExecResult result =
+        containerSuite.getHiveContainer().executeInContainer("hive", "-S", 
"-e", sql);
+    Assertions.assertEquals(
+        0,
+        result.getExitCode(),
+        String.format(
+            "Failed to execute SQL with hive cli. SQL: %s, stdout: %s, stderr: 
%s",
+            sql, result.getStdout(), result.getStderr()));
+  }
+
+  private long querySingleLong(String sql) {
+    ExecResult result =
+        containerSuite.getHiveContainer().executeInContainer("hive", "-S", 
"-e", sql);
+    Assertions.assertEquals(
+        0,
+        result.getExitCode(),
+        String.format(
+            "Failed to query SQL with hive cli. SQL: %s, stdout: %s, stderr: 
%s",
+            sql, result.getStdout(), result.getStderr()));
+
+    String[] lines = result.getStdout().split("\\R");
+    for (int index = lines.length - 1; index >= 0; index--) {
+      String line = lines[index].trim();
+      if (line.matches("-?\\d+")) {
+        return Long.parseLong(line);
+      }
+    }
+    throw new IllegalStateException(
+        String.format(
+            "Cannot parse numeric result from query output. SQL: %s, stdout: 
%s",
+            sql, result.getStdout()));
+  }
+
+  private String describeHiveObject(String qualifiedObjectName) {
+    ExecResult result =
+        containerSuite
+            .getHiveContainer()
+            .executeInContainer("hive", "-S", "-e", "DESCRIBE FORMATTED " + 
qualifiedObjectName);
+    Assertions.assertEquals(
+        0,
+        result.getExitCode(),
+        String.format(
+            "Failed to describe Hive object. Object: %s, stdout: %s, stderr: 
%s",
+            qualifiedObjectName, result.getStdout(), result.getStderr()));
+    return result.getStdout();
+  }
+}
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
index b37485553d..89988f1aa9 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/HiveTable.java
@@ -48,9 +48,20 @@ public class HiveTable extends BaseTable {
   public static final String TABLE_TYPE_PROP = "table_type";
   private String catalogName;
   private String databaseName;
+  private String viewOriginalText;
 
   protected HiveTable() {}
 
+  /**
+   * Returns the original SQL text for this Hive VIRTUAL_VIEW, or {@code null} 
if this is not a
+   * view.
+   *
+   * @return The view SQL, or {@code null}.
+   */
+  public String viewOriginalText() {
+    return viewOriginalText;
+  }
+
   public String catalogName() {
     return catalogName;
   }
@@ -73,6 +84,7 @@ public class HiveTable extends BaseTable {
 
     private String catalogName;
     private String databaseName;
+    private String viewOriginalText;
 
     /** Creates a new instance of {@link Builder}. */
     private Builder() {}
@@ -99,6 +111,17 @@ public class HiveTable extends BaseTable {
       return this;
     }
 
+    /**
+     * Sets the original SQL text for views (VIRTUAL_VIEW table type).
+     *
+     * @param viewOriginalText The view SQL.
+     * @return This Builder instance.
+     */
+    public Builder withViewOriginalText(String viewOriginalText) {
+      this.viewOriginalText = viewOriginalText;
+      return this;
+    }
+
     /**
      * Internal method to build a HiveTable instance using the provided values.
      *
@@ -118,6 +141,7 @@ public class HiveTable extends BaseTable {
       hiveTable.catalogName = catalogName;
       hiveTable.databaseName = databaseName;
       hiveTable.proxyPlugin = proxyPlugin;
+      hiveTable.viewOriginalText = viewOriginalText;
 
       // HMS put table comment in parameters
       if (comment != null) {
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
index 251d241c83..ab83ce4ea0 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClient.java
@@ -43,6 +43,9 @@ public interface HiveClient extends AutoCloseable {
 
   List<String> getAllTables(String catalogName, String databaseName);
 
+  List<String> listTablesByType(
+      String catalogName, String databaseName, String tablePattern, String 
tableType);
+
   List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize);
 
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
index 809f356eec..25b32cef44 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveClientImpl.java
@@ -80,6 +80,12 @@ public class HiveClientImpl implements HiveClient {
     return shim.getAllTables(catalogName, databaseName);
   }
 
+  @Override
+  public List<String> listTablesByType(
+      String catalogName, String databaseName, String tablePattern, String 
tableType) {
+    return shim.listTablesByType(catalogName, databaseName, tablePattern, 
tableType);
+  }
+
   @Override
   public List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize) {
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
index ba61ec65a4..e346d328bf 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShim.java
@@ -70,6 +70,9 @@ public abstract class HiveShim {
 
   public abstract List<String> getAllTables(String catalogName, String 
databaseName);
 
+  public abstract List<String> listTablesByType(
+      String catalogName, String databaseName, String tablePattern, String 
tableType);
+
   public abstract List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize);
 
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
index c82967cb77..dc3fbe2c91 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV2.java
@@ -32,6 +32,7 @@ import 
org.apache.gravitino.hive.converter.HiveDatabaseConverter;
 import org.apache.gravitino.hive.converter.HiveTableConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 
 class HiveShimV2 extends HiveShim {
@@ -115,6 +116,16 @@ class HiveShimV2 extends HiveShim {
     }
   }
 
+  @Override
+  public List<String> listTablesByType(
+      String catalogName, String databaseName, String tablePattern, String 
tableType) {
+    try {
+      return client.getTables(databaseName, tablePattern, 
TableType.valueOf(tableType));
+    } catch (Exception e) {
+      throw HiveExceptionConverter.toGravitinoException(e, 
ExceptionTarget.schema(databaseName));
+    }
+  }
+
   @Override
   public List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize) {
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
index 655981698c..1d6fca6ed9 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/client/HiveShimV3.java
@@ -34,6 +34,7 @@ import 
org.apache.gravitino.hive.converter.HiveDatabaseConverter;
 import org.apache.gravitino.hive.converter.HiveTableConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
 
@@ -51,6 +52,7 @@ class HiveShimV3 extends HiveShimV2 {
   private final Method alterTableMethod;
   private final Method dropTableMethod;
   private final Method getAllTablesMethod;
+  private final Method getTablesByTypeMethod;
   private final Method listTableNamesByFilterMethod;
   private final Method listPartitionNamesMethod;
   private final Method listPartitionsMethod;
@@ -110,6 +112,9 @@ class HiveShimV3 extends HiveShimV2 {
               "dropTable", String.class, String.class, String.class, 
boolean.class, boolean.class);
       this.getAllTablesMethod =
           IMetaStoreClient.class.getMethod("getAllTables", String.class, 
String.class);
+      this.getTablesByTypeMethod =
+          IMetaStoreClient.class.getMethod(
+              "getTables", String.class, String.class, String.class, 
TableType.class);
       this.listTableNamesByFilterMethod =
           IMetaStoreClient.class.getMethod(
               "listTableNamesByFilter", String.class, String.class, 
String.class, int.class);
@@ -242,6 +247,21 @@ class HiveShimV3 extends HiveShimV2 {
             databaseName);
   }
 
+  @Override
+  public List<String> listTablesByType(
+      String catalogName, String databaseName, String tablePattern, String 
tableType) {
+    TableType hiveTableType = TableType.valueOf(tableType);
+    return (List<String>)
+        invoke(
+            ExceptionTarget.schema(databaseName),
+            client,
+            getTablesByTypeMethod,
+            catalogName,
+            databaseName,
+            tablePattern,
+            hiveTableType);
+  }
+
   @Override
   public List<String> listTableNamesByFilter(
       String catalogName, String databaseName, String filter, short pageSize) {
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
index 5c7079bf33..27100a6026 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
@@ -33,13 +33,16 @@ import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.identit
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.catalog.hive.StorageFormat;
 import org.apache.gravitino.hive.HiveColumn;
 import org.apache.gravitino.hive.HivePartition;
@@ -84,6 +87,9 @@ public class HiveTableConverter {
       // Hive2 doesn't have getCatName method, catalogName will be null
     }
 
+    String viewOriginalText =
+        StringUtils.defaultString(table.getViewOriginalText(), 
table.getViewExpandedText());
+
     HiveTable hiveTable =
         HiveTable.builder()
             .withName(table.getTableName())
@@ -96,6 +102,7 @@ public class HiveTableConverter {
             .withPartitioning(partitioning)
             .withCatalogName(catalogName)
             .withDatabaseName(table.getDbName())
+            .withViewOriginalText(viewOriginalText)
             .build();
     return hiveTable;
   }
@@ -106,6 +113,11 @@ public class HiveTableConverter {
 
     Optional.ofNullable(table.getTableType()).ifPresent(t -> 
properties.put(TABLE_TYPE, t));
 
+    // VIRTUAL_VIEW tables may have a minimal or absent StorageDescriptor — 
skip SD fields.
+    if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType())) {
+      return properties;
+    }
+
     StorageDescriptor sd = table.getSd();
     properties.put(LOCATION, sd.getLocation());
     properties.put(INPUT_FORMAT, sd.getInputFormat());
@@ -135,13 +147,29 @@ public class HiveTableConverter {
         hiveTable.properties().getOrDefault(TABLE_TYPE, 
String.valueOf(TableType.MANAGED_TABLE));
     table.setTableType(tableType.toUpperCase());
 
-    List<FieldSchema> partitionFields =
-        hiveTable.partitionFieldNames().stream()
-            .map(fieldName -> buildPartitionKeyField(fieldName, hiveTable))
-            .collect(Collectors.toList());
-    table.setSd(buildStorageDescriptor(hiveTable, partitionFields));
+    if (TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(tableType)) {
+      // Views require a minimal StorageDescriptor (HMS validates its 
presence), while the output
+      // schema is stored in sd.cols.
+      StorageDescriptor sd = new StorageDescriptor();
+      sd.setCols(buildStorageDescriptor(hiveTable, 
Collections.emptyList()).getCols());
+      sd.setSerdeInfo(new SerDeInfo());
+      table.setSd(sd);
+      table.setPartitionKeys(new ArrayList<>());
+      String viewOriginalText = hiveTable.viewOriginalText();
+      if (viewOriginalText != null) {
+        table.setViewOriginalText(viewOriginalText);
+        table.setViewExpandedText(viewOriginalText);
+      }
+    } else {
+      List<FieldSchema> partitionFields =
+          hiveTable.partitionFieldNames().stream()
+              .map(fieldName -> buildPartitionKeyField(fieldName, hiveTable))
+              .collect(Collectors.toList());
+      table.setSd(buildStorageDescriptor(hiveTable, partitionFields));
+      table.setPartitionKeys(partitionFields);
+    }
+
     table.setParameters(buildTableParameters(hiveTable));
-    table.setPartitionKeys(partitionFields);
 
     // Set AuditInfo to Hive's Table object. Hive's Table doesn't support 
setting last modifier
     // and last modified time, so we only set creator and create time.
@@ -282,7 +310,7 @@ public class HiveTableConverter {
   public static Distribution 
getDistribution(org.apache.hadoop.hive.metastore.api.Table table) {
     StorageDescriptor sd = table.getSd();
     Distribution distribution = Distributions.NONE;
-    if (sd.getBucketCols() != null && !sd.getBucketCols().isEmpty()) {
+    if (sd != null && sd.getBucketCols() != null && 
!sd.getBucketCols().isEmpty()) {
       // Hive table use hash strategy as bucketing strategy
       distribution =
           Distributions.hash(
@@ -295,7 +323,7 @@ public class HiveTableConverter {
   public static SortOrder[] 
getSortOrders(org.apache.hadoop.hive.metastore.api.Table table) {
     SortOrder[] sortOrders = SortOrders.NONE;
     StorageDescriptor sd = table.getSd();
-    if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
+    if (sd != null && sd.getSortCols() != null && !sd.getSortCols().isEmpty()) 
{
       sortOrders =
           sd.getSortCols().stream()
               .map(
@@ -309,26 +337,30 @@ public class HiveTableConverter {
   }
 
   public static Transform[] 
getPartitioning(org.apache.hadoop.hive.metastore.api.Table table) {
-    return table.getPartitionKeys().stream()
-        .map(p -> identity(p.getName()))
-        .toArray(Transform[]::new);
+    List<FieldSchema> partitionKeys =
+        table.getPartitionKeys() == null ? Collections.emptyList() : 
table.getPartitionKeys();
+    return partitionKeys.stream().map(p -> 
identity(p.getName())).toArray(Transform[]::new);
   }
 
   public static Column[] getColumns(org.apache.hadoop.hive.metastore.api.Table 
table) {
     StorageDescriptor sd = table.getSd();
+    List<FieldSchema> storageColumns =
+        sd == null || sd.getCols() == null ? Collections.emptyList() : 
sd.getCols();
+    List<FieldSchema> partitionKeys =
+        table.getPartitionKeys() == null ? Collections.emptyList() : 
table.getPartitionKeys();
     // Collect column names from sd.getCols() to check for duplicates
     Set<String> columnNames =
-        
sd.getCols().stream().map(FieldSchema::getName).collect(Collectors.toSet());
+        
storageColumns.stream().map(FieldSchema::getName).collect(Collectors.toSet());
 
     return Stream.concat(
-            sd.getCols().stream()
+            storageColumns.stream()
                 .map(
                     f ->
                         buildColumn(
                             f.getName(),
                             
HiveDataTypeConverter.CONVERTER.toGravitino(f.getType()),
                             f.getComment())),
-            table.getPartitionKeys().stream()
+            partitionKeys.stream()
                 // Filter out partition keys that already exist in sd.getCols()
                 .filter(p -> !columnNames.contains(p.getName()))
                 .map(
diff --git 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHive2HMS.java
 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHive2HMS.java
index 8c30e0b18a..6dbc9d5edd 100644
--- 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHive2HMS.java
+++ 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/client/TestHive2HMS.java
@@ -50,6 +50,7 @@ import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -134,6 +135,11 @@ public class TestHive2HMS {
       List<String> allTables = hiveClient.getAllTables(catalogName, dbName);
       Assertions.assertTrue(allTables.contains(tableName), "Table should be in 
the list");
 
+      List<String> managedTables =
+          hiveClient.listTablesByType(catalogName, dbName, "*", 
TableType.MANAGED_TABLE.name());
+      Assertions.assertTrue(
+          managedTables.contains(tableName), "Managed table list should 
contain the table");
+
       HiveTable loadedTable = hiveClient.getTable(catalogName, dbName, 
tableName);
       Assertions.assertNotNull(loadedTable, "Loaded table should not be null");
       Assertions.assertEquals(tableName, loadedTable.name(), "Table name 
should match");
diff --git 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
index 3b9945807b..a811bd835b 100644
--- 
a/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
+++ 
b/catalogs/hive-metastore-common/src/test/java/org/apache/gravitino/hive/converter/TestHiveTableConverter.java
@@ -18,11 +18,21 @@
  */
 package org.apache.gravitino.hive.converter;
 
+import static org.apache.gravitino.catalog.hive.HiveConstants.TABLE_TYPE;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+import java.time.Instant;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.hive.HiveTable;
+import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -154,4 +164,70 @@ public class TestHiveTableConverter {
     assertEquals("id", columns[0].name());
     assertEquals("name", columns[1].name());
   }
+
+  @Test
+  public void testToHiveTablePreservesVirtualViewColumnsInStorageDescriptor() {
+    Column[] columns = {
+      Column.of("id", Types.IntegerType.get(), "ID column"),
+      Column.of("name", Types.StringType.get(), "Name column")
+    };
+    HiveTable hiveTable =
+        HiveTable.builder()
+            .withName("v_orders")
+            .withDatabaseName("db")
+            .withColumns(columns)
+            .withProperties(new HashMap<>(Map.of(TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("tester").withCreateTime(Instant.now()).build())
+            .withViewOriginalText("SELECT id, name FROM t")
+            .build();
+
+    Table table = HiveTableConverter.toHiveTable(hiveTable);
+
+    assertNotNull(table.getSd());
+    assertEquals(2, table.getSd().getColsSize());
+    assertEquals("id", table.getSd().getCols().get(0).getName());
+    assertEquals("name", table.getSd().getCols().get(1).getName());
+    assertEquals("SELECT id, name FROM t", table.getViewOriginalText());
+    assertEquals("SELECT id, name FROM t", table.getViewExpandedText());
+  }
+
+  @Test
+  public void testToHiveTableMirrorsOriginalTextToExpandedText() {
+    Column[] columns = {Column.of("id", Types.IntegerType.get(), "ID column")};
+    HiveTable hiveTable =
+        HiveTable.builder()
+            .withName("v_orders")
+            .withDatabaseName("db")
+            .withColumns(columns)
+            .withProperties(new HashMap<>(Map.of(TABLE_TYPE, 
TableType.VIRTUAL_VIEW.name())))
+            .withAuditInfo(
+                
AuditInfo.builder().withCreator("tester").withCreateTime(Instant.now()).build())
+            .withViewOriginalText("SELECT `db`.`t`.`id` FROM `db`.`t`")
+            .build();
+
+    Table table = HiveTableConverter.toHiveTable(hiveTable);
+
+    assertEquals("SELECT `db`.`t`.`id` FROM `db`.`t`", 
table.getViewOriginalText());
+    assertEquals("SELECT `db`.`t`.`id` FROM `db`.`t`", 
table.getViewExpandedText());
+  }
+
+  @Test
+  public void testFromVirtualViewWithoutStorageDescriptorDoesNotThrow() {
+    Table table = new Table();
+    table.setTableName("v_orders");
+    table.setDbName("db");
+    table.setTableType(TableType.VIRTUAL_VIEW.name());
+    table.setParameters(new HashMap<>());
+    table.setPartitionKeys(null);
+    table.setSd(null);
+    table.setViewOriginalText(null);
+    table.setViewExpandedText("SELECT `1`");
+
+    HiveTable hiveTable = assertDoesNotThrow(() -> 
HiveTableConverter.fromHiveTable(table));
+
+    assertEquals("v_orders", hiveTable.name());
+    assertEquals(0, hiveTable.columns().length);
+    assertEquals("SELECT `1`", hiveTable.viewOriginalText());
+  }
 }

Reply via email to