This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cb455bbb67 HIVE-29578: Iceberg: add support for logical views (#6449)
1cb455bbb67 is described below

commit 1cb455bbb6753e548e1435f4345b4e0d4644bf9f
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Wed Jun 10 10:03:22 2026 -0400

    HIVE-29578: Iceberg: add support for logical views (#6449)
    
    * HIVE-29578: Iceberg: support for Iceberg logical views.
    
    ---------
    
    Co-authored-by: Dmitriy Fingerman <[email protected]>
    Co-authored-by: Dmitriy Fingerman <[email protected]>
---
 .../java/org/apache/hadoop/hive/ql/ErrorMsg.java   |   1 +
 .../iceberg/hive/HMSTablePropertyHelper.java       |   1 +
 .../apache/iceberg/hive/HiveOperationsBase.java    |   2 +-
 .../apache/iceberg/hive/HiveViewOperations.java    |   2 +-
 .../apache/iceberg/hive/IcebergViewSupport.java    | 143 ++++++++
 .../org/apache/iceberg/hive/MetastoreUtil.java     |  92 ++++-
 .../iceberg/hive/client/HiveRESTCatalogClient.java | 105 +++++-
 .../iceberg/hive/TestIcebergViewSupport.java       | 145 ++++++++
 .../hive/client/TestHiveRESTCatalogClient.java     |  61 +++-
 .../iceberg/mr/hive/BaseHiveIcebergMetaHook.java   |  34 ++
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  55 ++-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  25 ++
 .../positive/iceberg_rest_catalog_gravitino.q      |  49 ++-
 .../queries/positive/iceberg_rest_catalog_hms.q    |  29 +-
 .../src/test/queries/positive/iceberg_view.q       |  87 +++++
 .../src/test/results/positive/iceberg_view.q.out   | 380 +++++++++++++++++++++
 .../llap/iceberg_rest_catalog_gravitino.q.out      | 255 +++++++++++++-
 .../positive/llap/iceberg_rest_catalog_hms.q.out   | 140 +++++++-
 .../hive/TestHiveRESTCatalogClientITBase.java      |  96 +++++-
 .../hadoop/hive/ql/parse/TestParseCreateView.java  |  41 +++
 .../ql/ddl/view/create/CreateViewAnalyzer.java     |  63 +++-
 .../hive/ql/ddl/view/create/CreateViewDesc.java    |  18 +-
 .../ql/ddl/view/create/CreateViewOperation.java    |  23 ++
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   5 +-
 .../ql/metadata/CreateExternalViewRequest.java     |  89 +++++
 .../hive/ql/metadata/HiveStorageHandler.java       |   8 +
 .../apache/hadoop/hive/ql/parse/StorageFormat.java |  13 +
 27 files changed, 1906 insertions(+), 56 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java 
b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 57f7cadac18..156d7c7ebe7 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -445,6 +445,7 @@ public enum ErrorMsg {
   @Deprecated // kept for backwards reference
   REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with 
materialized view", true),
   REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view 
{0} with view", true),
+  VIEW_STORAGE_HANDLER_UNSUPPORTED(10448, "Storage handler {0} doesn't support 
external logical views", true),
   UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
   MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild 
could not be retrieved"),
   MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE 
clauses " +
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
index f7de4dd0719..43098c9913a 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java
@@ -161,6 +161,7 @@ public static void updateHmsTableForIcebergView(
         HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
         metadata.schema(),
         maxHiveTablePropertySize);
+    parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, 
HIVE_ICEBERG_STORAGE_HANDLER);
     tbl.setParameters(parameters);
   }
 
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
index 3afff626bac..bf2684daa3f 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveOperationsBase.java
@@ -46,7 +46,7 @@
 import org.slf4j.LoggerFactory;
 
 /** All the HMS operations like table,view,materialized_view should implement 
this. */
-interface HiveOperationsBase {
+public interface HiveOperationsBase {
 
   Logger LOG = LoggerFactory.getLogger(HiveOperationsBase.class);
   // The max size is based on HMS backend database. For Hive versions below 
2.3, the max table
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
index c0c959974a6..10f6736c353 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveViewOperations.java
@@ -302,7 +302,7 @@ private Table newHMSView(ViewMetadata metadata) {
         tableType().name());
   }
 
-  private String sqlFor(ViewMetadata metadata) {
+  public static String sqlFor(ViewMetadata metadata) {
     SQLViewRepresentation closest = null;
     for (ViewRepresentation representation : 
metadata.currentVersion().representations()) {
       if (representation instanceof SQLViewRepresentation) {
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergViewSupport.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergViewSupport.java
new file mode 100644
index 00000000000..26149ad413b
--- /dev/null
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/IcebergViewSupport.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.view.ViewBuilder;
+
+/**
+ * Commits a native Iceberg view through the configured default Iceberg 
catalog (HiveCatalog or REST
+ * catalog, etc.) when {@code Catalog} also implements {@link ViewCatalog}.
+ */
+public final class IcebergViewSupport {
+
+  private IcebergViewSupport() {
+  }
+
+  /**
+   * Loads the native Iceberg logical view definition and applies SQL, schema, 
and Iceberg params to {@code hmsTable}
+   */
+  public static void enrichHmsTableFromIcebergView(
+      org.apache.hadoop.hive.metastore.api.Table hmsTable, Configuration conf) 
{
+    TableIdentifier identifier = TableIdentifier.of(hmsTable.getDbName(), 
hmsTable.getTableName());
+    String catalogName = IcebergCatalogProperties.getCatalogName(conf);
+    Map<String, String> catalogProps = 
IcebergCatalogProperties.getCatalogProperties(conf, catalogName);
+    Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, 
catalogProps, conf);
+    runWithCatalog(catalog, () -> loadAndApplyView(hmsTable, conf, catalog, 
catalogName, identifier));
+  }
+
+  private static void loadAndApplyView(
+      org.apache.hadoop.hive.metastore.api.Table hmsTable,
+      Configuration conf,
+      Catalog catalog,
+      String catalogName,
+      TableIdentifier identifier) {
+    ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName);
+    MetastoreUtil.applyIcebergViewToHmsTable(hmsTable, 
viewCatalog.loadView(identifier), conf);
+  }
+
+  public static void createOrReplaceView(
+      Configuration conf,
+      String databaseName,
+      String viewName,
+      List<FieldSchema> fieldSchemas,
+      String viewSql,
+      Map<String, String> tblProperties,
+      String comment) {
+
+    TableIdentifier identifier = TableIdentifier.of(databaseName, viewName);
+    String catalogName = IcebergCatalogProperties.getCatalogName(conf);
+    Map<String, String> catalogProps = 
IcebergCatalogProperties.getCatalogProperties(conf, catalogName);
+    Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, 
catalogProps, conf);
+    runWithCatalog(
+        catalog,
+        () -> commitView(catalog, catalogName, identifier, fieldSchemas, 
viewSql, tblProperties, comment));
+  }
+
+  /**
+   * Runs {@code action} with {@code catalog}, then closes it when the catalog 
implements
+   * {@link Closeable} (e.g. REST catalog clients).
+   */
+  private static void runWithCatalog(Catalog catalog, Runnable action) {
+    if (catalog instanceof Closeable closeable) {
+      try (Closeable ignored = closeable) {
+        action.run();
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close Iceberg catalog", e);
+      }
+    } else {
+      action.run();
+    }
+  }
+
+  private static void commitView(
+      Catalog catalog,
+      String catalogName,
+      TableIdentifier identifier,
+      List<FieldSchema> fieldSchemas,
+      String viewSql,
+      Map<String, String> tblProperties,
+      String comment) {
+    ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName);
+
+    ViewBuilder builder =
+        viewCatalog
+            .buildView(identifier)
+            .withSchema(HiveSchemaUtil.convert(fieldSchemas, 
Collections.emptyMap(), true))
+            
.withDefaultNamespace(Namespace.of(identifier.namespace().level(0)))
+            .withQuery("hive", viewSql);
+
+    if (StringUtils.isNotBlank(comment)) {
+      builder = builder.withProperty("comment", comment);
+    }
+
+    Map<String, String> tblProps =
+        tblProperties == null ? Maps.newHashMap() : 
Maps.newHashMap(tblProperties);
+
+    builder.withProperties(tblProps);
+
+    builder.createOrReplace();
+  }
+
+  private static ViewCatalog asViewCatalog(Catalog catalog, String 
catalogName) {
+    if (catalog instanceof ViewCatalog viewCatalog) {
+      return viewCatalog;
+    }
+    throw new UnsupportedOperationException(
+        String.format(
+                "Iceberg catalog '%s' does not implement ViewCatalog.",
+                catalogName) +
+            " Iceberg views require a catalog that implements ViewCatalog 
(e.g. HiveCatalog or REST).");
+  }
+}
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
index 95e1e5b3662..94b89cb8c0e 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/MetastoreUtil.java
@@ -36,8 +36,10 @@
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.Schema;
@@ -46,6 +48,11 @@
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.view.BaseView;
+import org.apache.iceberg.view.SQLViewRepresentation;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewMetadata;
 import org.apache.thrift.TException;
 
 public class MetastoreUtil {
@@ -134,7 +141,11 @@ public static Table toHiveTable(org.apache.iceberg.Table 
table, Configuration co
     result.setDbName(tableName.getDb());
     result.setTableName(tableName.getTable());
     result.setTableType(TableType.EXTERNAL_TABLE.toString());
-    result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
+
+    // TODO: Revert after HIVE-29633 is fixed
+    // result.setPartitionKeys(getPartitionKeys(table, table.spec().specId()));
+    result.setPartitionKeys(Lists.newArrayList());
+
     TableMetadata metadata = ((BaseTable) table).operations().current();
     long maxHiveTablePropertySize = 
conf.getLong(HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
         HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
@@ -142,12 +153,89 @@ public static Table toHiveTable(org.apache.iceberg.Table 
table, Configuration co
         null, true, maxHiveTablePropertySize, null);
     String catalogType = IcebergCatalogProperties.getCatalogType(conf);
     if (!StringUtils.isEmpty(catalogType) && 
!IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
-      result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
IcebergCatalogProperties.getCatalogType(conf));
+      result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
catalogType);
     }
     result.setSd(getHiveStorageDescriptor(table));
     return result;
   }
 
+  /**
+   * Builds a minimal HMS {@link Table} shell for Iceberg view (identity, view 
type,
+   * and Iceberg storage-handler markers only). The storage handler {@code 
postGetTable} hook enriches
+   * this object via {@link IcebergViewSupport#enrichHmsTableFromIcebergView} 
(view SQL,
+   * schema, and Iceberg parameters).
+   */
+  public static Table buildMinimalHMSView(String catName, String dbName, 
String tableName) {
+    Table result = new Table();
+    result.setCatName(catName);
+    result.setDbName(dbName);
+    result.setTableName(tableName);
+    result.setTableType(TableType.VIRTUAL_VIEW.toString());
+
+    Map<String, String> parameters = Maps.newHashMap();
+    parameters.put(
+        BaseMetastoreTableOperations.TABLE_TYPE_PROP, 
HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE);
+    parameters.put(
+        hive_metastoreConstants.META_TABLE_STORAGE, 
HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER);
+    result.setParameters(parameters);
+    return result;
+  }
+
+  /**
+   * Applies Iceberg view metadata (SQL, schema, params) onto an existing HMS 
{@link Table}.
+   */
+  public static void applyIcebergViewToHmsTable(Table hmsTable, View view, 
Configuration conf) {
+    ViewMetadata metadata = ((BaseView) view).operations().current();
+    String sqlText = viewSqlText(view, metadata);
+
+    boolean hiveEngineEnabled = false;
+    hmsTable.setSd(HiveOperationsBase.storageDescriptor(metadata.schema(), 
metadata.location(), hiveEngineEnabled));
+    StorageDescriptor sd = hmsTable.getSd();
+
+    if (sd.getBucketCols() == null) {
+      sd.setBucketCols(Lists.newArrayList());
+    }
+
+    if (sd.getSortCols() == null) {
+      sd.setSortCols(Lists.newArrayList());
+    }
+
+    long maxHiveTablePropertySize =
+        conf.getLong(
+            HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE,
+            HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
+    HMSTablePropertyHelper.updateHmsTableForIcebergView(
+        metadata.metadataFileLocation(),
+        hmsTable,
+        metadata,
+        Collections.emptySet(),
+        maxHiveTablePropertySize,
+        null);
+
+    hmsTable.setCreateTime((int) (metadata.version(1).timestampMillis() / 
1000));
+    hmsTable.setLastAccessTime((int) 
(metadata.currentVersion().timestampMillis() / 1000));
+    hmsTable.setOwner(
+        PropertyUtil.propertyAsString(
+            metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, 
HiveHadoopUtil.currentUser()));
+
+    // In-memory overlay for compile/describe: authoritative SQL comes from 
Iceberg metadata.
+    hmsTable.setViewOriginalText(sqlText);
+    hmsTable.setViewExpandedText(sqlText);
+
+    String catalogType = IcebergCatalogProperties.getCatalogType(conf);
+    if (!StringUtils.isEmpty(catalogType) && 
!IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) {
+      hmsTable.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
IcebergCatalogProperties.getCatalogType(conf));
+    }
+  }
+
+  private static String viewSqlText(View view, ViewMetadata metadata) {
+    SQLViewRepresentation hiveRepr = view.sqlFor("hive");
+    if (hiveRepr != null) {
+      return hiveRepr.sql();
+    }
+    return HiveViewOperations.sqlFor(metadata);
+  }
+
   private static StorageDescriptor 
getHiveStorageDescriptor(org.apache.iceberg.Table table) {
     var result = new StorageDescriptor();
     result.setCols(HiveSchemaUtil.convert(table.schema()));
diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
index a0ca7c937aa..30f9e7dcb46 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/client/HiveRESTCatalogClient.java
@@ -21,12 +21,15 @@
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
@@ -38,16 +41,20 @@
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.client.BaseMetaStoreClient;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveOperationsBase;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.hive.IcebergTableProperties;
+import org.apache.iceberg.hive.IcebergViewSupport;
 import org.apache.iceberg.hive.MetastoreUtil;
 import org.apache.iceberg.hive.RuntimeMetaException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -119,10 +126,20 @@ public List<String> getTables(String catName, String 
dbName, String tablePattern
     Pattern pattern = Pattern.compile(regex);
 
     // List tables from the specific database (namespace) and filter them.
-    return restCatalog.listTables(Namespace.of(dbName)).stream()
+    Set<String> names = new LinkedHashSet<>();
+    restCatalog.listTables(Namespace.of(dbName)).stream()
         .map(TableIdentifier::name)
         .filter(pattern.asPredicate())
-        .toList();
+        .forEach(names::add);
+
+    if (restCatalog instanceof ViewCatalog viewCatalog) {
+      viewCatalog
+          .listViews(Namespace.of(dbName)).stream()
+          .map(TableIdentifier::name)
+          .filter(pattern.asPredicate())
+          .forEach(names::add);
+    }
+    return Lists.newArrayList(names);
   }
 
   @Override
@@ -132,7 +149,12 @@ public List<String> getAllTables(String catName, String 
dbName) {
 
   @Override
   public void dropTable(Table table, boolean deleteData, boolean 
ignoreUnknownTab, boolean ifPurge) throws TException {
-    restCatalog.dropTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()));
+    TableIdentifier id = TableIdentifier.of(table.getDbName(), 
table.getTableName());
+    if (restCatalog instanceof ViewCatalog viewCatalog && 
viewCatalog.viewExists(id)) {
+      viewCatalog.dropView(id);
+    } else {
+      restCatalog.dropTable(id);
+    }
   }
 
   private void validateCurrentCatalog(String catName) {
@@ -145,7 +167,11 @@ private void validateCurrentCatalog(String catName) {
   @Override
   public boolean tableExists(String catName, String dbName, String tableName) {
     validateCurrentCatalog(catName);
-    return restCatalog.tableExists(TableIdentifier.of(dbName, tableName));
+    TableIdentifier id = TableIdentifier.of(dbName, tableName);
+    if (restCatalog.tableExists(id)) {
+      return true;
+    }
+    return restCatalog instanceof ViewCatalog viewCatalog && 
viewCatalog.viewExists(id);
   }
 
   @Override
@@ -174,25 +200,58 @@ public Database getDatabase(String catName, String 
dbName) throws NoSuchObjectEx
   @Override
   public Table getTable(GetTableRequest tableRequest) throws TException {
     validateCurrentCatalog(tableRequest.getCatName());
-    org.apache.iceberg.Table icebergTable;
+    TableIdentifier id =
+        TableIdentifier.of(tableRequest.getDbName(), 
tableRequest.getTblName());
     try {
-      icebergTable = 
restCatalog.loadTable(TableIdentifier.of(tableRequest.getDbName(),
-          tableRequest.getTblName()));
-    } catch (NoSuchTableException exception) {
+      org.apache.iceberg.Table icebergTable = restCatalog.loadTable(id);
+      return MetastoreUtil.toHiveTable(icebergTable, conf);
+    } catch (NoSuchTableException tableMissing) {
+      if (restCatalog instanceof ViewCatalog viewCatalog) {
+        if (!viewCatalog.viewExists(id)) {
+          throw new NoSuchObjectException();
+        }
+        return MetastoreUtil.buildMinimalHMSView(
+            tableRequest.getCatName(), tableRequest.getDbName(), 
tableRequest.getTblName());
+      }
       throw new NoSuchObjectException();
     }
-    return MetastoreUtil.toHiveTable(icebergTable, conf);
+  }
+
+  private static boolean hasIcebergViewTableType(Table table) {
+    if (!TableType.VIRTUAL_VIEW.toString().equals(table.getTableType())) {
+      return false;
+    }
+    Map<String, String> params = table.getParameters();
+    if (params == null) {
+      return false;
+    }
+    return HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(
+        params.get(BaseMetastoreTableOperations.TABLE_TYPE_PROP));
+  }
+
+  @Override
+  public void alter_table(String catName, String dbName, String tblName, Table 
newTable,
+      EnvironmentContext envContext, String validWriteIdList) {
+    validateCurrentCatalog(catName);
+    if (hasIcebergViewTableType(newTable) && restCatalog instanceof 
ViewCatalog) {
+      createOrReplaceIcebergView(newTable, dbName, tblName);
+    }
   }
 
   @Override
   public void createTable(CreateTableRequest request) throws TException {
     Table table = request.getTable();
-    List<FieldSchema> cols = Lists.newArrayList(table.getSd().getCols());
-    if (table.isSetPartitionKeys() && !table.getPartitionKeys().isEmpty()) {
-      cols.addAll(table.getPartitionKeys());
+    if (hasIcebergViewTableType(table) && restCatalog instanceof ViewCatalog) {
+      createOrReplaceIcebergView(table, table.getDbName(), 
table.getTableName());
+    } else {
+      createIcebergTable(request);
     }
+  }
+
+  private void createIcebergTable(CreateTableRequest request) {
+    Table table = request.getTable();
     Properties tableProperties = 
IcebergTableProperties.getTableProperties(table, conf);
-    Schema schema = HiveSchemaUtil.convert(cols, Collections.emptyMap(), true);
+    Schema schema = HiveSchemaUtil.convert(hmsTableColumns(table), 
Collections.emptyMap(), true);
     Map<String, String> envCtxProps = 
Optional.ofNullable(request.getEnvContext())
         .map(EnvironmentContext::getProperties)
         .orElse(Collections.emptyMap());
@@ -200,7 +259,8 @@ public void createTable(CreateTableRequest request) throws 
TException {
         HMSTablePropertyHelper.getPartitionSpec(envCtxProps, schema);
     SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(tableProperties, 
schema);
 
-    restCatalog.buildTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()), schema)
+    restCatalog
+        .buildTable(TableIdentifier.of(table.getDbName(), 
table.getTableName()), schema)
         .withPartitionSpec(partitionSpec)
         
.withLocation(tableProperties.getProperty(IcebergTableProperties.LOCATION))
         .withSortOrder(sortOrder)
@@ -208,6 +268,23 @@ public void createTable(CreateTableRequest request) throws 
TException {
         .create();
   }
 
+  private void createOrReplaceIcebergView(Table table, String dbName, String 
tableName) {
+    Map<String, String> tblProps =
+        table.getParameters() == null ? Maps.newHashMap() : 
Maps.newHashMap(table.getParameters());
+    String comment = tblProps.get("comment");
+    List<FieldSchema> cols = Lists.newArrayList(table.getSd().getCols());
+    IcebergViewSupport.createOrReplaceView(
+        conf, dbName, tableName, cols, table.getViewExpandedText(), tblProps, 
comment);
+  }
+
+  private static List<FieldSchema> hmsTableColumns(Table table) {
+    List<FieldSchema> cols = Lists.newArrayList(table.getSd().getCols());
+    if (table.isSetPartitionKeys() && !table.getPartitionKeys().isEmpty()) {
+      cols.addAll(table.getPartitionKeys());
+    }
+    return cols;
+  }
+
   @Override
   public void createDatabase(Database db) {
     validateCurrentCatalog(db.getCatalogName());
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestIcebergViewSupport.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestIcebergViewSupport.java
new file mode 100644
index 00000000000..f8691f18e25
--- /dev/null
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestIcebergViewSupport.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.view.BaseView;
+import org.apache.iceberg.view.View;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TestIcebergViewSupport {
+
+  private static final String DB = "native_vw_db";
+  private static final String VIEW = "native_vw";
+
+  @RegisterExtension
+  private static final HiveMetastoreExtension HIVE_METASTORE_EXTENSION =
+      HiveMetastoreExtension.builder().withDatabase(DB).build();
+
+  @AfterEach
+  void dropView() {
+    HiveCatalog cat = loadCatalog();
+    TableIdentifier id = TableIdentifier.of(DB, VIEW);
+    cat.dropView(id);
+  }
+
+  private HiveConf nativeViewConf() {
+    HiveConf conf = new HiveConf(HIVE_METASTORE_EXTENSION.hiveConf());
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CATALOG_DEFAULT, "hive");
+    conf.set(
+        IcebergCatalogProperties.catalogPropertyConfigKey("hive", 
ICEBERG_CATALOG_TYPE),
+        ICEBERG_CATALOG_TYPE_HIVE);
+    return conf;
+  }
+
+  private HiveCatalog loadCatalog() {
+    return (HiveCatalog)
+        CatalogUtil.loadCatalog(
+            HiveCatalog.class.getName(),
+            ICEBERG_CATALOG_TYPE_HIVE,
+            ImmutableMap.of(
+                CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
+                String.valueOf(TimeUnit.SECONDS.toMillis(10))),
+            HIVE_METASTORE_EXTENSION.hiveConf());
+  }
+
+  @Test
+  void testCreateCommitsNativeViewWithUserProperties() {
+    HiveConf conf = nativeViewConf();
+    List<FieldSchema> cols =
+        Arrays.asList(new FieldSchema("id", "int", null), new 
FieldSchema("name", "string", null));
+    String sql = String.format("select id, name from %s.src_tbl", DB);
+    Map<String, String> props = Collections.singletonMap("k1", "v1");
+
+    IcebergViewSupport.createOrReplaceView(
+        conf, DB, VIEW, cols, sql, props, "hello-view");
+
+    HiveCatalog cat = loadCatalog();
+    TableIdentifier id = TableIdentifier.of(DB, VIEW);
+    assertThat(cat.viewExists(id)).isTrue();
+    View view = cat.loadView(id);
+    assertThat(view.properties())
+        .containsEntry("comment", "hello-view")
+        .containsEntry("k1", "v1")
+        .doesNotContainKey("hive.storage.external.logical.view.handler");
+    HiveViewOperations ops = (HiveViewOperations) ((BaseView) 
view).operations();
+    assertThat(ops.current().currentVersion().representations()).isNotEmpty();
+  }
+
+  @Test
+  void testCreateOrReplaceViewReplacesExisting() {
+    HiveConf conf = nativeViewConf();
+    List<FieldSchema> cols = Collections.singletonList(new FieldSchema("id", 
"int", null));
+    TableIdentifier id = TableIdentifier.of(DB, VIEW);
+
+    IcebergViewSupport.createOrReplaceView(
+        conf, DB, VIEW, cols, "select 1 as id", null, null);
+    View afterCreate = loadCatalog().loadView(id);
+    assertThat(afterCreate.sqlFor("hive").sql().trim()).isEqualTo("select 1 as 
id");
+
+    IcebergViewSupport.createOrReplaceView(
+        conf, DB, VIEW, cols, "select 2 as id", null, null);
+
+    assertThat(loadCatalog().viewExists(id)).isTrue();
+    View afterReplace = loadCatalog().loadView(id);
+    assertThat(afterReplace.sqlFor("hive").sql().trim()).isEqualTo("select 2 
as id");
+  }
+
+  @Test
+  void testEnrichHmsTableFromIcebergViewOverridesStaleHmsSql() throws 
TException {
+    HiveConf conf = nativeViewConf();
+    List<FieldSchema> cols = Collections.singletonList(new FieldSchema("id", 
"int", null));
+    String sql = "select 42 as id";
+
+    IcebergViewSupport.createOrReplaceView(
+        conf, DB, VIEW, cols, sql, null, null);
+
+    org.apache.hadoop.hive.metastore.api.Table hmsTable =
+        HIVE_METASTORE_EXTENSION.metastoreClient().getTable(DB, VIEW);
+    hmsTable.setViewOriginalText("select 0");
+    hmsTable.setViewExpandedText("select 0");
+
+    IcebergViewSupport.enrichHmsTableFromIcebergView(hmsTable, conf);
+    assertThat(hmsTable.getViewExpandedText()).isEqualTo(sql);
+    assertThat(hmsTable.getViewOriginalText()).isEqualTo(sql);
+
+    ViewMetadata metadata = ((BaseView) 
loadCatalog().loadView(TableIdentifier.of(DB, VIEW))).operations().current();
+    assertThat(hmsTable.getCreateTime()).isEqualTo((int) 
(metadata.version(1).timestampMillis() / 1000));
+  }
+}
diff --git 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
index 1ae7e742774..4f21f46b2e2 100644
--- 
a/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
+++ 
b/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/client/TestHiveRESTCatalogClient.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
@@ -32,6 +33,7 @@
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
@@ -43,7 +45,10 @@
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.hive.HiveOperationsBase;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.IcebergViewSupport;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -59,8 +64,9 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 
-public class TestHiveRESTCatalogClient {
+class TestHiveRESTCatalogClient {
 
   private static HiveRESTCatalogClient spyHiveRESTCatalogClient;
   private static RESTCatalog mockRestCatalog;
@@ -137,13 +143,13 @@ public void after() {
   }
 
   @Test
-  public void testGetTable() throws TException {
+  void testGetTable() throws TException {
     spyHiveRESTCatalogClient.getTable("default", "tableName");
     Mockito.verify(mockRestCatalog).loadTable(TableIdentifier.of("default", 
"tableName"));
   }
 
   @Test
-  public void testCreateTable() throws TException {
+  void testCreateTable() throws TException {
     Table table = new Table();
     table.setTableName("tableName");
     table.setDbName("default");
@@ -155,7 +161,7 @@ public void testCreateTable() throws TException {
   }
 
   @Test
-  public void testCreatePartitionedTable() throws TException {
+  void testCreatePartitionedTable() throws TException {
     Table table = new Table();
     table.setTableName("tableName");
     table.setDbName("default");
@@ -192,9 +198,54 @@ public void testCreatePartitionedTable() throws TException 
{
   }
 
   @Test
-  public void testGetDatabase() throws TException {
+  void testGetDatabase() throws TException {
     Database aDefault = spyHiveRESTCatalogClient.getDatabase("default");
     assertThat(aDefault.getName()).isEqualTo("default");
     Mockito.verify(mockRestCatalog).listNamespaces(Namespace.empty());
   }
+
+  @Test
+  void testAlterIcebergView() {
+    RESTCatalog viewCapableCatalog =
+        Mockito.mock(RESTCatalog.class, 
Mockito.withSettings().extraInterfaces(ViewCatalog.class));
+    Mockito.doReturn("hive").when(viewCapableCatalog).name();
+    mockCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(any(), any(), 
any()))
+        .thenReturn(viewCapableCatalog);
+
+    Configuration configuration = new Configuration();
+    configuration.set("iceberg.catalog", "ice01");
+    configuration.set("iceberg.catalog.ice01.uri", "http://localhost";);
+    HiveRESTCatalogClient client = new HiveRESTCatalogClient(configuration);
+
+    try (MockedStatic<IcebergViewSupport> viewSupport =
+        Mockito.mockStatic(IcebergViewSupport.class)) {
+      client.alter_table("hive", "ice_db", "ice_v1", createIcebergView(), 
null, null);
+      viewSupport.verify(
+          () ->
+              IcebergViewSupport.createOrReplaceView(
+                  any(),
+                  eq("ice_db"),
+                  eq("ice_v1"),
+                  any(),
+                  eq("select 1"),
+                  any(),
+                  eq(null)));
+    }
+  }
+
+  private static Table createIcebergView() {
+    Table view = new Table();
+    view.setTableName("ice_v1");
+    view.setDbName("ice_db");
+    view.setTableType(TableType.VIRTUAL_VIEW.toString());
+    view.setViewExpandedText("select 1");
+    view.setSd(new StorageDescriptor());
+    view.getSd().setCols(Collections.singletonList(new FieldSchema("x", "int", 
"")));
+    view.setParameters(
+        Maps.newHashMap(
+            Map.of(
+                BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+                HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE)));
+    return view;
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
index 69fbe5bf99c..5673b2e0da9 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java
@@ -34,6 +34,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -60,9 +61,11 @@
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.hive.HMSTablePropertyHelper;
+import org.apache.iceberg.hive.HiveOperationsBase;
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.hive.IcebergTableProperties;
+import org.apache.iceberg.hive.IcebergViewSupport;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -115,6 +118,16 @@ public BaseHiveIcebergMetaHook(Configuration conf) {
     this.conf = conf;
   }
 
+  public static boolean 
isIcebergView(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
+    if (hmsTable == null ||
+        hmsTable.getParameters() == null ||
+        !TableType.VIRTUAL_VIEW.toString().equals(hmsTable.getTableType())) {
+      return false;
+    }
+    String storageHandler = 
hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+    return HiveMetaHook.HIVE_ICEBERG_STORAGE_HANDLER.equals(storageHandler);
+  }
+
   @Override
   public void preCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
     CreateTableRequest request = new CreateTableRequest(hmsTable);
@@ -127,6 +140,10 @@ public void preCreateTable(CreateTableRequest request) {
     if (hmsTable.isTemporary()) {
       throw new UnsupportedOperationException("Creation of temporary iceberg 
tables is not supported.");
     }
+    if (isIcebergView(hmsTable)) {
+      preCreateIcebergView(request);
+      return;
+    }
     this.tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
 
     // Set the table type even for non HiveCatalog based tables
@@ -204,6 +221,18 @@ public void preCreateTable(CreateTableRequest request) {
     setSortOrder(hmsTable, schema, tableProperties);
   }
 
+  private void preCreateIcebergView(CreateTableRequest request) {
+
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = request.getTable();
+    tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
+
+    hmsTable
+        .getParameters()
+        .put(
+            BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+            HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE);
+  }
+
   /**
    * Method for verification that necessary catalog configs are defined in 
Session Conf.
    *
@@ -504,6 +533,10 @@ protected void setWriteModeDefaults(Table icebergTbl, 
Map<String, String> newPro
   public void postGetTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
     if (hmsTable != null) {
       try {
+        if (isIcebergView(hmsTable)) {
+          IcebergViewSupport.enrichHmsTableFromIcebergView(hmsTable, conf);
+          return;
+        }
         Table tbl = IcebergTableUtil.getTable(conf, hmsTable);
         String formatVersion = String.valueOf(TableUtil.formatVersion(tbl));
         hmsTable.getParameters().put(TableProperties.FORMAT_VERSION, 
formatVersion);
@@ -531,4 +564,5 @@ private static boolean isHiveIcebergStorageHandler(String 
storageHandler) {
       throw new RuntimeException("Error checking storage handler class", e);
     }
   }
+
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 58c2d19373d..ea5401021de 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -118,6 +118,7 @@
 import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.hive.HiveTableOperations;
 import org.apache.iceberg.hive.IcebergTableProperties;
+import org.apache.iceberg.hive.IcebergViewSupport;
 import org.apache.iceberg.hive.MetastoreLock;
 import org.apache.iceberg.hive.NoLock;
 import org.apache.iceberg.io.CloseableIterable;
@@ -176,13 +177,23 @@ public HiveIcebergMetaHook(Configuration conf) {
     super(conf);
   }
 
-  @Override
-  public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
-    // do nothing
-  }
-
   @Override
   public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
+    if (isIcebergView(hmsTable)) {
+      tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
+      Map<String, String> tblProps =
+          hmsTable.getParameters() == null ? Maps.newHashMap() : 
Maps.newHashMap(hmsTable.getParameters());
+      String comment = tblProps.get("comment");
+      IcebergViewSupport.createOrReplaceView(
+          conf,
+          hmsTable.getDbName(),
+          hmsTable.getTableName(),
+          hmsTable.getSd().getCols(),
+          hmsTable.getViewExpandedText(),
+          tblProps,
+          comment);
+      return;
+    }
     if (icebergTable == null) {
 
       
setFileFormat(tableProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT));
@@ -208,11 +219,6 @@ public void 
commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTabl
     }
   }
 
-  @Override
-  public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
-    // do nothing
-  }
-
   @Override
   public void preDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
     this.tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
@@ -235,11 +241,6 @@ public void 
preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, bo
     }
   }
 
-  @Override
-  public void rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable) {
-    // do nothing
-  }
-
   @Override
   public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, boolean deleteData) {
     if (deleteData && deleteIcebergTable) {
@@ -265,6 +266,15 @@ public void 
commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable,
   @Override
   public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table 
hmsTable, EnvironmentContext context)
       throws MetaException {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable)) {
+      currentAlterTableOp = null;
+      if (commitLock == null) {
+        commitLock = new NoLock();
+      }
+      commitLock.lock();
+      tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
+      return;
+    }
     tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
     setupAlterOperationType(hmsTable, context);
     if (AlterTableType.RENAME.equals(currentAlterTableOp)) {
@@ -493,6 +503,21 @@ public void 
commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
     if (commitLock == null) {
       throw new IllegalStateException("Hive commit lock should already be 
set");
     }
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable)) {
+      tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
+      Map<String, String> tblProps =
+          hmsTable.getParameters() == null ? Maps.newHashMap() : 
Maps.newHashMap(hmsTable.getParameters());
+      String comment = tblProps.get("comment");
+      IcebergViewSupport.createOrReplaceView(
+          conf,
+          hmsTable.getDbName(),
+          hmsTable.getTableName(),
+          hmsTable.getSd().getCols(),
+          hmsTable.getViewExpandedText(),
+          tblProps,
+          comment);
+      return;
+    }
     commitLock.unlock();
     if (isTableMigration) {
       tableProperties = IcebergTableProperties.getTableProperties(hmsTable, 
conf);
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 6da2162df55..fbc704a3619 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -383,6 +383,11 @@ public boolean supportsPartitioning() {
     return true;
   }
 
+  @Override
+  public boolean supportsExternalViewCatalog() {
+    return true;
+  }
+
   /**
    * @param jobConf Job configuration for InputFormat to access
    * @param deserializer Deserializer
@@ -428,6 +433,9 @@ public boolean 
canProvidePartitionStatistics(org.apache.hadoop.hive.ql.metadata.
     if (!getStatsSource().equals(HiveMetaHook.ICEBERG)) {
       return false;
     }
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      return false;
+    }
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
     Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
     if (snapshot != null) {
@@ -891,6 +899,9 @@ public boolean supportsPartitionTransform() {
 
   @Override
   public List<TransformSpec> 
getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      return Collections.emptyList();
+    }
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
     return table.spec().fields().stream()
       .filter(f -> !f.transform().isVoid())
@@ -905,6 +916,9 @@ public List<TransformSpec> 
getPartitionTransformSpec(org.apache.hadoop.hive.ql.m
   @Override
   public Map<Integer, List<TransformSpec>> getPartitionTransformSpecs(
       org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      return Collections.emptyMap();
+    }
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
     return table.specs().entrySet().stream().flatMap(e ->
       e.getValue().fields().stream()
@@ -1570,6 +1584,9 @@ public boolean supportsSortColumns() {
 
   @Override
   public List<FieldSchema> 
sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      return Collections.emptyList();
+    }
     TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
     Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
     if (table.sortOrder().isUnsorted()) {
@@ -2130,6 +2147,10 @@ public List<Partition> 
getPartitions(org.apache.hadoop.hive.ql.metadata.Table hm
   }
 
   public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable) {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      List<FieldSchema> partCols = hmsTable.getPartCols();
+      return partCols != null && !partCols.isEmpty();
+    }
     if (!hmsTable.getTTable().isSetId()) {
       return false;
     }
@@ -2275,6 +2296,10 @@ public boolean 
canPerformMetadataDelete(org.apache.hadoop.hive.ql.metadata.Table
 
   @Override
   public List<FieldSchema> 
getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    if (BaseHiveIcebergMetaHook.isIcebergView(hmsTable.getTTable())) {
+      List<FieldSchema> partCols = hmsTable.getPartCols();
+      return partCols != null ? partCols : Collections.emptyList();
+    }
     if (!hmsTable.getTTable().isSetId()) {
       return Collections.emptyList();
     }
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q
index 81982ca44d9..91aa14fdc17 100644
--- 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_gravitino.q
@@ -15,8 +15,6 @@
 --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
 --! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
 --! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
--- Mask compaction id as they will be allocated in parallel threads
---! qt:replace:/^[0-9]/#Masked#/
 -- Mask removed file size
 --! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
 -- Mask iceberg version
@@ -47,7 +45,7 @@ partitioned by (company_id bigint)
 stored by iceberg stored as orc;
 
 -----------------------------------------------------------------------------
---! Creating  table with a valid catalog name in table properties
+--! Creating a table with a valid catalog name in table properties
 -----------------------------------------------------------------------------
 
 create table ice_orc2 (
@@ -74,6 +72,51 @@ VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), 
('fn3','ln3', 3, 30);
 describe formatted ice_orc2;
 select * from ice_orc2;
 
+---------------------------------------------------------------------------------------------------------------------
+--! Iceberg views tests
+---------------------------------------------------------------------------------------------------------------------
+
+-----------------------------------------------------------------------------------------------------
+--! Iceberg view with TBLPROPERTIES ('view-format'='iceberg') on a REST 
catalog table
+-----------------------------------------------------------------------------------------------------
+
+create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1, 3);
+
+select * from ice_v1;
+desc formatted ice_v1;
+
+------- if-not-exists view test - view should not change 
-------------------------
+
+create view if not exists ice_v1 tblproperties ('view-format'='iceberg')
+as select * from ice_orc2 where dept_id = 10000;
+
+select * from ice_v1;
+desc formatted ice_v1;
+
+------- replace view test - view should be replaced 
------------------------------
+
+create or replace view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name || '-' || dept_id from ice_orc2 where dept_id = 2;
+
+select * from ice_v1;
+desc formatted ice_v1;
+
+drop view ice_v1;
+
+-----------------------------------------------------------------------------------------------
+--! Iceberg view with default Iceberg storage handler and REST catalog table 
+-----------------------------------------------------------------------------------------------
+
+set 
hive.default.storage.handler.class=org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+
+create view ice_v2
+as select first_name, last_name || '-' || dept_id from ice_orc2 where team_id 
in (20, 30);
+
+select * from ice_v2;
+desc formatted ice_v2;
+drop view ice_v2;
+
 -----------------------------------------------------------------------------
 
 show tables;
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_hms.q 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_hms.q
index 27f23122240..a1d8cb6b905 100644
--- 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_hms.q
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_rest_catalog_hms.q
@@ -15,8 +15,6 @@
 --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
 --! 
qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
 --! 
qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
--- Mask compaction id as they will be allocated in parallel threads
---! qt:replace:/^[0-9]/#Masked#/
 -- Mask removed file size
 --! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
 -- Mask iceberg version
@@ -47,7 +45,7 @@ partitioned by (company_id bigint)
 stored by iceberg stored as orc;
 
 -----------------------------------------------------------------------------
---! Creating  table with a valid catalog name in table properties
+--! Creating a table with a valid catalog name in table properties
 -----------------------------------------------------------------------------
 
 create table ice_orc2 (
@@ -69,6 +67,31 @@ VALUES ('fn1','ln1', 1, 10), ('fn2','ln2', 2, 20), 
('fn3','ln3', 3, 30);
 describe formatted ice_orc2;
 select * from ice_orc2;
 
+-----------------------------------------------------------------------------------------------
+--! Iceberg view with TBLPROPERTIES ('view-format'='iceberg') on a REST 
catalog table
+-----------------------------------------------------------------------------------------------
+
+create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1,2);
+
+select * from ice_v1;
+desc formatted ice_v1;
+drop view ice_v1;
+
+-----------------------------------------------------------------------------------------------
+--! Iceberg view: 'view-format' table properly omitted, Hive config 
'hive.default.storage.handler.class' 
+--! set to 'HiveIcebergStorageHandler'
+-----------------------------------------------------------------------------------------------
+
+set 
hive.default.storage.handler.class=org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+
+create view ice_v2
+as select dept_id, team_id from ice_orc2 where company_id = 100;
+
+select * from ice_v2;
+desc formatted ice_v2;
+drop view ice_v2;
+
 -----------------------------------------------------------------------------
 
 show tables;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_view.q 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_view.q
new file mode 100644
index 00000000000..037b67371b1
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_view.q
@@ -0,0 +1,87 @@
+-- SORT_QUERY_RESULTS
+-- Mask random uuid
+--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
+
+create database ice_native_view_db;
+use ice_native_view_db;
+
+create table src_ice (
+    first_name string, 
+    last_name string
+ )
+partitioned by (dept_id bigint)
+stored by iceberg stored as orc;
+
+INSERT INTO src_ice VALUES
+  ('fn1','ln1', 1),
+  ('fn2','ln2', 1),
+  ('fn3','ln3', 1),
+  ('fn4','ln4', 1),
+  ('fn5','ln5', 2),
+  ('fn6','ln6', 2),
+  ('fn7','ln7', 2);
+
+-------------------------------------------------------------------------------
+-- Native Iceberg view via TBLPROPERTIES
+-------------------------------------------------------------------------------
+
+-- TEST VIEW CREATION --
+
+create view v_ice tblproperties ('view-format'='iceberg') 
+as select * from src_ice;
+
+select * from v_ice;
+
+-- TEST VIEW REPLACEMENT --
+
+create or replace view v_ice tblproperties ('view-format'='iceberg') 
+as select first_name || '-' || dept_id from src_ice where dept_id = 1;
+
+select * from v_ice;
+desc formatted v_ice;
+
+-------------------------------------------------------------------------------
+-- Native Iceberg view when default storage handler is Iceberg 
+-- and no 'view-format' property in TBLPROPERTIES
+-------------------------------------------------------------------------------
+
+set 
hive.default.storage.handler.class=org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+
+-- TEST VIEW CREATION WITH IF EXISTS --
+
+create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice where dept_id = 2;
+
+select * from v_def;
+
+-- TEST VIEW IS NOT CREATED BECAUSE IT ALREADY EXISTS --
+
+create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice;
+
+select * from v_def;
+
+desc formatted v_def;
+drop view v_def;
+
+-----------------------------------------------------------------------------------------
+-- Classic Hive view when the base table is Iceberg and default storage 
handler is unset
+-----------------------------------------------------------------------------------------
+
+set hive.default.storage.handler.class=;
+
+create view v_hive as select * from src_ice;
+select * from v_hive;
+desc formatted v_hive;
+drop view v_hive;
+
+-----------------------------------------------------------------------------------------
+-- Replace Iceberg logical view with a Hive-native logical view
+-----------------------------------------------------------------------------------------
+
+create or replace view v_ice 
+as select first_name from src_ice where dept_id = 2;
+
+select * from v_ice;
+desc formatted v_ice;
+drop view v_ice;
\ No newline at end of file
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/iceberg_view.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/iceberg_view.q.out
new file mode 100644
index 00000000000..ad072ca1c23
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_view.q.out
@@ -0,0 +1,380 @@
+PREHOOK: query: create database ice_native_view_db
+PREHOOK: type: CREATEDATABASE
+PREHOOK: Output: database:ice_native_view_db
+POSTHOOK: query: create database ice_native_view_db
+POSTHOOK: type: CREATEDATABASE
+POSTHOOK: Output: database:ice_native_view_db
+PREHOOK: query: use ice_native_view_db
+PREHOOK: type: SWITCHDATABASE
+PREHOOK: Input: database:ice_native_view_db
+POSTHOOK: query: use ice_native_view_db
+POSTHOOK: type: SWITCHDATABASE
+POSTHOOK: Input: database:ice_native_view_db
+PREHOOK: query: create table src_ice (
+    first_name string, 
+    last_name string
+ )
+partitioned by (dept_id bigint)
+stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@src_ice
+POSTHOOK: query: create table src_ice (
+    first_name string, 
+    last_name string
+ )
+partitioned by (dept_id bigint)
+stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@src_ice
+PREHOOK: query: INSERT INTO src_ice VALUES
+  ('fn1','ln1', 1),
+  ('fn2','ln2', 1),
+  ('fn3','ln3', 1),
+  ('fn4','ln4', 1),
+  ('fn5','ln5', 2),
+  ('fn6','ln6', 2),
+  ('fn7','ln7', 2)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: ice_native_view_db@src_ice
+POSTHOOK: query: INSERT INTO src_ice VALUES
+  ('fn1','ln1', 1),
+  ('fn2','ln2', 1),
+  ('fn3','ln3', 1),
+  ('fn4','ln4', 1),
+  ('fn5','ln5', 2),
+  ('fn6','ln6', 2),
+  ('fn7','ln7', 2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: ice_native_view_db@src_ice
+PREHOOK: query: create view v_ice tblproperties ('view-format'='iceberg') 
+as select * from src_ice
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_ice
+POSTHOOK: query: create view v_ice tblproperties ('view-format'='iceberg') 
+as select * from src_ice
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_ice
+POSTHOOK: Lineage: v_ice.dept_id SIMPLE 
[(src_ice)src_ice.FieldSchema(name:dept_id, type:bigint, comment:null), ]
+POSTHOOK: Lineage: v_ice.first_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:first_name, type:string, comment:null), ]
+POSTHOOK: Lineage: v_ice.last_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:last_name, type:string, comment:null), ]
+PREHOOK: query: select * from v_ice
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_ice
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn1    ln1     1
+fn2    ln2     1
+fn3    ln3     1
+fn4    ln4     1
+fn5    ln5     2
+fn6    ln6     2
+fn7    ln7     2
+PREHOOK: query: create or replace view v_ice tblproperties 
('view-format'='iceberg') 
+as select first_name || '-' || dept_id from src_ice where dept_id = 1
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_ice
+POSTHOOK: query: create or replace view v_ice tblproperties 
('view-format'='iceberg') 
+as select first_name || '-' || dept_id from src_ice where dept_id = 1
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_ice
+PREHOOK: query: select * from v_ice
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_ice
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn1-1
+fn2-1
+fn3-1
+fn4-1
+PREHOOK: query: desc formatted v_ice
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: query: desc formatted v_ice
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_native_view_db@v_ice
+# col_name             data_type               comment             
+_c0                    string                                      
+                
+# Detailed Table Information            
+Database:              ice_native_view_db       
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":1,\"fields\":[{\"id\":1,\"name\":\"_c0\",\"required\":false,\"type\":\"string\"}]}
+       metadata_location       hdfs://### HDFS PATH ###
+       previous_metadata_location      hdfs://### HDFS PATH ###
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `src_ice`.`first_name` || '-' || 
`src_ice`.`dept_id` from `ice_native_view_db`.`src_ice` where 
`src_ice`.`dept_id` = 1    
+Expanded Query:        select `src_ice`.`first_name` || '-' || 
`src_ice`.`dept_id` from `ice_native_view_db`.`src_ice` where 
`src_ice`.`dept_id` = 1    
+PREHOOK: query: create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice where dept_id = 2
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_def
+POSTHOOK: query: create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice where dept_id = 2
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_def
+POSTHOOK: Lineage: v_def.dept_id SIMPLE []
+POSTHOOK: Lineage: v_def.first_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:first_name, type:string, comment:null), ]
+POSTHOOK: Lineage: v_def.last_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:last_name, type:string, comment:null), ]
+PREHOOK: query: select * from v_def
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_def
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_def
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_def
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn5    ln5     2
+fn6    ln6     2
+fn7    ln7     2
+PREHOOK: query: create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_def
+POSTHOOK: query: create view if not exists v_def 
+as select first_name, last_name, dept_id from src_ice
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_def
+PREHOOK: query: select * from v_def
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_def
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_def
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_def
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn5    ln5     2
+fn6    ln6     2
+fn7    ln7     2
+PREHOOK: query: desc formatted v_def
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_native_view_db@v_def
+POSTHOOK: query: desc formatted v_def
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_native_view_db@v_def
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+dept_id                bigint                                      
+                
+# Detailed Table Information            
+Database:              ice_native_view_db       
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"}]}
+       metadata_location       hdfs://### HDFS PATH ###
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+#### A masked pattern was here ####
+       uuid                    #Masked#
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `src_ice`.`first_name`, `src_ice`.`last_name`, 
`src_ice`.`dept_id` from `ice_native_view_db`.`src_ice` where 
`src_ice`.`dept_id` = 2      
+Expanded Query:        select `src_ice`.`first_name`, `src_ice`.`last_name`, 
`src_ice`.`dept_id` from `ice_native_view_db`.`src_ice` where 
`src_ice`.`dept_id` = 2      
+PREHOOK: query: drop view v_def
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_native_view_db@v_def
+PREHOOK: Output: ice_native_view_db@v_def
+POSTHOOK: query: drop view v_def
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_native_view_db@v_def
+POSTHOOK: Output: ice_native_view_db@v_def
+PREHOOK: query: create view v_hive as select * from src_ice
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_hive
+POSTHOOK: query: create view v_hive as select * from src_ice
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_hive
+POSTHOOK: Lineage: v_hive.dept_id SIMPLE 
[(src_ice)src_ice.FieldSchema(name:dept_id, type:bigint, comment:null), ]
+POSTHOOK: Lineage: v_hive.first_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:first_name, type:string, comment:null), ]
+POSTHOOK: Lineage: v_hive.last_name SIMPLE 
[(src_ice)src_ice.FieldSchema(name:last_name, type:string, comment:null), ]
+PREHOOK: query: select * from v_hive
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_hive
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_hive
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_hive
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn1    ln1     1
+fn2    ln2     1
+fn3    ln3     1
+fn4    ln4     1
+fn5    ln5     2
+fn6    ln6     2
+fn7    ln7     2
+PREHOOK: query: desc formatted v_hive
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_native_view_db@v_hive
+POSTHOOK: query: desc formatted v_hive
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_native_view_db@v_hive
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+dept_id                bigint                                      
+                
+# Detailed Table Information            
+Database:              ice_native_view_db       
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+#### A masked pattern was here ####
+                
+# Storage Information           
+SerDe Library:         null                     
+InputFormat:           org.apache.hadoop.mapred.TextInputFormat         
+OutputFormat:          
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat       
+Compressed:            No                       
+Num Buckets:           -1                       
+Bucket Columns:        []                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select * from src_ice    
+Expanded Query:        select `src_ice`.`first_name`, `src_ice`.`last_name`, 
`src_ice`.`dept_id` from `ice_native_view_db`.`src_ice`    
+PREHOOK: query: drop view v_hive
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_native_view_db@v_hive
+PREHOOK: Output: ice_native_view_db@v_hive
+POSTHOOK: query: drop view v_hive
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_native_view_db@v_hive
+POSTHOOK: Output: ice_native_view_db@v_hive
+PREHOOK: query: create or replace view v_ice 
+as select first_name from src_ice where dept_id = 2
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Output: database:ice_native_view_db
+PREHOOK: Output: ice_native_view_db@v_ice
+POSTHOOK: query: create or replace view v_ice 
+as select first_name from src_ice where dept_id = 2
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Output: database:ice_native_view_db
+POSTHOOK: Output: ice_native_view_db@v_ice
+PREHOOK: query: select * from v_ice
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_native_view_db@src_ice
+PREHOOK: Input: ice_native_view_db@v_ice
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from v_ice
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_native_view_db@src_ice
+POSTHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+fn5
+fn6
+fn7
+PREHOOK: query: desc formatted v_ice
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: query: desc formatted v_ice
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_native_view_db@v_ice
+# col_name             data_type               comment             
+first_name             string                                      
+                
+# Detailed Table Information            
+Database:              ice_native_view_db       
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":1,\"fields\":[{\"id\":1,\"name\":\"_c0\",\"required\":false,\"type\":\"string\"}]}
+       metadata_location       hdfs://### HDFS PATH ###
+       previous_metadata_location      hdfs://### HDFS PATH ###
+       table_type              ICEBERG-VIEW        
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Num Buckets:           0                        
+Bucket Columns:        []                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select first_name from src_ice where dept_id = 2        
 
+Expanded Query:        select `src_ice`.`first_name` from 
`ice_native_view_db`.`src_ice` where `src_ice`.`dept_id` = 2  
+PREHOOK: query: drop view v_ice
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_native_view_db@v_ice
+PREHOOK: Output: ice_native_view_db@v_ice
+POSTHOOK: query: drop view v_ice
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_native_view_db@v_ice
+POSTHOOK: Output: ice_native_view_db@v_ice
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
index 8bba659e8fd..05c7e7bcfb4 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_gravitino.q.out
@@ -68,8 +68,6 @@ CREATE EXTERNAL TABLE `ice_orc2`(
   `dept_id` bigint, 
   `team_id` bigint, 
   `company_id` bigint)
-PARTITIONED BY ( 
-  `company_id` bigint COMMENT 'Transform: identity')
 PARTITIONED BY SPEC ( 
 `company_id`)
 ROW FORMAT SERDE 
@@ -180,6 +178,259 @@ POSTHOOK: Input: ice_rest@ice_orc2
 fn1    ln1     1       10      100
 fn2    ln2     2       20      100
 fn3    ln3     3       30      100
+PREHOOK: query: create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1, 3)
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1, 3)
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v1
+POSTHOOK: Lineage: ice_v1.first_name SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:first_name, type:string, comment:null), ]
+POSTHOOK: Lineage: ice_v1.last_name SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:last_name, type:string, comment:null), ]
+PREHOOK: query: select * from ice_v1
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+fn1    ln1
+fn3    ln3
+PREHOOK: query: desc formatted ice_v1
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v1
+POSTHOOK: query: desc formatted ice_v1
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v1
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1, 3)   
+Expanded Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1, 3)   
+PREHOOK: query: create view if not exists ice_v1 tblproperties 
('view-format'='iceberg')
+as select * from ice_orc2 where dept_id = 10000
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: create view if not exists ice_v1 tblproperties 
('view-format'='iceberg')
+as select * from ice_orc2 where dept_id = 10000
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v1
+PREHOOK: query: select * from ice_v1
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+fn1    ln1
+fn3    ln3
+PREHOOK: query: desc formatted ice_v1
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v1
+POSTHOOK: query: desc formatted ice_v1
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v1
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1, 3)   
+Expanded Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1, 3)   
+PREHOOK: query: create or replace view ice_v1 tblproperties 
('view-format'='iceberg')
+as select first_name || '-' || dept_id from ice_orc2 where dept_id = 2
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: create or replace view ice_v1 tblproperties 
('view-format'='iceberg')
+as select first_name || '-' || dept_id from ice_orc2 where dept_id = 2
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v1
+PREHOOK: query: select * from ice_v1
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+fn2-2
+PREHOOK: query: desc formatted ice_v1
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v1
+POSTHOOK: query: desc formatted ice_v1
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v1
+# col_name             data_type               comment             
+_c0                    string                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":1,\"fields\":[{\"id\":1,\"name\":\"_c0\",\"required\":false,\"type\":\"string\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`first_name` || '-' || 
`ice_orc2`.`dept_id` from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` = 2  
+Expanded Query:        select `ice_orc2`.`first_name` || '-' || 
`ice_orc2`.`dept_id` from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` = 2  
+PREHOOK: query: drop view ice_v1
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_rest@ice_v1
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: drop view ice_v1
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_rest@ice_v1
+POSTHOOK: Output: ice_rest@ice_v1
+PREHOOK: query: create view ice_v2
+as select first_name, last_name || '-' || dept_id from ice_orc2 where team_id 
in (20, 30)
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v2
+POSTHOOK: query: create view ice_v2
+as select first_name, last_name || '-' || dept_id from ice_orc2 where team_id 
in (20, 30)
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v2
+POSTHOOK: Lineage: ice_v2._c1 EXPRESSION 
[(ice_orc2)ice_orc2.FieldSchema(name:last_name, type:string, comment:null), 
(ice_orc2)ice_orc2.FieldSchema(name:dept_id, type:bigint, comment:null), ]
+POSTHOOK: Lineage: ice_v2.first_name SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:first_name, type:string, comment:null), ]
+PREHOOK: query: select * from ice_v2
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v2
+#### A masked pattern was here ####
+fn2    ln2-2
+fn3    ln3-3
+PREHOOK: query: desc formatted ice_v2
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v2
+POSTHOOK: query: desc formatted ice_v2
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v2
+# col_name             data_type               comment             
+first_name             string                                      
+_c1                    string                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"_c1\",\"required\":false,\"type\":\"string\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
|| '-' || `ice_orc2`.`dept_id` from `ice_rest`.`ice_orc2` where 
`ice_orc2`.`team_id` in (20, 30)  
+Expanded Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
|| '-' || `ice_orc2`.`dept_id` from `ice_rest`.`ice_orc2` where 
`ice_orc2`.`team_id` in (20, 30)  
+PREHOOK: query: drop view ice_v2
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_rest@ice_v2
+PREHOOK: Output: ice_rest@ice_v2
+POSTHOOK: query: drop view ice_v2
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_rest@ice_v2
+POSTHOOK: Output: ice_rest@ice_v2
 PREHOOK: query: show tables
 PREHOOK: type: SHOWTABLES
 PREHOOK: Input: database:ice_rest
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
index 409eb484480..a92beb820d1 100644
--- 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_rest_catalog_hms.q.out
@@ -68,8 +68,6 @@ CREATE EXTERNAL TABLE `ice_orc2`(
   `dept_id` bigint, 
   `team_id` bigint, 
   `company_id` bigint)
-PARTITIONED BY ( 
-  `company_id` bigint COMMENT 'Transform: identity')
 PARTITIONED BY SPEC ( 
 `company_id`)
 ROW FORMAT SERDE 
@@ -180,6 +178,144 @@ POSTHOOK: Input: ice_rest@ice_orc2
 fn1    ln1     1       10      100
 fn2    ln2     2       20      100
 fn3    ln3     3       30      100
+PREHOOK: query: create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1,2)
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: create view ice_v1 tblproperties ('view-format'='iceberg')
+as select first_name, last_name from ice_orc2 where dept_id in (1,2)
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v1
+POSTHOOK: Lineage: ice_v1.first_name SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:first_name, type:string, comment:null), ]
+POSTHOOK: Lineage: ice_v1.last_name SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:last_name, type:string, comment:null), ]
+PREHOOK: query: select * from ice_v1
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v1
+#### A masked pattern was here ####
+fn1    ln1
+fn2    ln2
+PREHOOK: query: desc formatted ice_v1
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v1
+POSTHOOK: query: desc formatted ice_v1
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v1
+# col_name             data_type               comment             
+first_name             string                                      
+last_name              string                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+       view-format             iceberg             
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1,2)    
+Expanded Query:        select `ice_orc2`.`first_name`, `ice_orc2`.`last_name` 
from `ice_rest`.`ice_orc2` where `ice_orc2`.`dept_id` in (1,2)    
+PREHOOK: query: drop view ice_v1
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_rest@ice_v1
+PREHOOK: Output: ice_rest@ice_v1
+POSTHOOK: query: drop view ice_v1
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_rest@ice_v1
+POSTHOOK: Output: ice_rest@ice_v1
+PREHOOK: query: create view ice_v2
+as select dept_id, team_id from ice_orc2 where company_id = 100
+PREHOOK: type: CREATEVIEW
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Output: database:ice_rest
+PREHOOK: Output: ice_rest@ice_v2
+POSTHOOK: query: create view ice_v2
+as select dept_id, team_id from ice_orc2 where company_id = 100
+POSTHOOK: type: CREATEVIEW
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Output: database:ice_rest
+POSTHOOK: Output: ice_rest@ice_v2
+POSTHOOK: Lineage: ice_v2.dept_id SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:dept_id, type:bigint, comment:null), ]
+POSTHOOK: Lineage: ice_v2.team_id SIMPLE 
[(ice_orc2)ice_orc2.FieldSchema(name:team_id, type:bigint, comment:null), ]
+PREHOOK: query: select * from ice_v2
+PREHOOK: type: QUERY
+PREHOOK: Input: ice_rest@ice_orc2
+PREHOOK: Input: ice_rest@ice_v2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from ice_v2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: ice_rest@ice_orc2
+POSTHOOK: Input: ice_rest@ice_v2
+#### A masked pattern was here ####
+1      10
+2      20
+3      30
+PREHOOK: query: desc formatted ice_v2
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: ice_rest@ice_v2
+POSTHOOK: query: desc formatted ice_v2
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: ice_rest@ice_v2
+# col_name             data_type               comment             
+dept_id                bigint                                      
+team_id                bigint                                      
+                
+# Detailed Table Information            
+Database:              ice_rest                 
+#### A masked pattern was here ####
+Retention:             0                        
+Table Type:            VIRTUAL_VIEW             
+Table Parameters:               
+       bucketing_version       2                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"}]}
+#### A masked pattern was here ####
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG-VIEW        
+       type                    rest                
+       uuid                    #Masked#
+                
+# Storage Information           
+SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe      
 
+InputFormat:           org.apache.hadoop.mapred.FileInputFormat         
+OutputFormat:          org.apache.hadoop.mapred.FileOutputFormat        
+Compressed:            No                       
+Sort Columns:          []                       
+                
+# View Information              
+Original Query:        select `ice_orc2`.`dept_id`, `ice_orc2`.`team_id` from 
`ice_rest`.`ice_orc2` where `ice_orc2`.`company_id` = 100         
+Expanded Query:        select `ice_orc2`.`dept_id`, `ice_orc2`.`team_id` from 
`ice_rest`.`ice_orc2` where `ice_orc2`.`company_id` = 100         
+PREHOOK: query: drop view ice_v2
+PREHOOK: type: DROPVIEW
+PREHOOK: Input: ice_rest@ice_v2
+PREHOOK: Output: ice_rest@ice_v2
+POSTHOOK: query: drop view ice_v2
+POSTHOOK: type: DROPVIEW
+POSTHOOK: Input: ice_rest@ice_v2
+POSTHOOK: Output: ice_rest@ice_v2
 PREHOOK: query: show tables
 PREHOOK: type: SHOWTABLES
 PREHOOK: Input: database:ice_rest
diff --git 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
index fd30223c993..03885a54e1c 100644
--- 
a/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
+++ 
b/itests/hive-iceberg/src/test/java/org/apache/hive/TestHiveRESTCatalogClientITBase.java
@@ -30,7 +30,9 @@
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -38,13 +40,15 @@
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.hive.IcebergCatalogProperties;
+import org.apache.iceberg.hive.HiveOperationsBase;
 import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.hive.IcebergCatalogProperties;
 import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -67,6 +71,8 @@
 public abstract class TestHiveRESTCatalogClientITBase {
 
   static final String DB_NAME = "ice_db";
+  static final String VIEW_DB_NAME = "ice_db_view";
+  static final String NATIVE_VIEW_NAME = "native_rest_v";
   static final String TABLE_NAME = "ice_tbl";
   static final String CATALOG_NAME = "ice01";
   static final String HIVE_ICEBERG_STORAGE_HANDLER = 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
@@ -109,6 +115,20 @@ void setup() throws Exception {
     msClient = new HiveMetaStoreClient(conf, hookLoader);
     hiveConf = new HiveConf(conf, HiveConf.class);
     hive = Hive.get(hiveConf);
+    dropDatabaseIfExists(VIEW_DB_NAME);
+  }
+
+  private void dropDatabaseIfExists(String dbName) {
+    try {
+      msClient.dropTable(CATALOG_NAME, dbName, NATIVE_VIEW_NAME);
+    } catch (Exception ignored) {
+      // view may not exist
+    }
+    try {
+      msClient.dropDatabase(dbName);
+    } catch (Exception ignored) {
+      // database may not exist
+    }
   }
 
   @AfterEach
@@ -173,8 +193,14 @@ public void testIceberg() throws Exception {
     Assertions.assertEquals(TABLE_NAME, table.getTableName());
     Assertions.assertEquals(HIVE_ICEBERG_STORAGE_HANDLER, 
table.getParameters().get("storage_handler"));
     
Assertions.assertNotNull(table.getParameters().get(TableProperties.DEFAULT_PARTITION_SPEC));
-    Assertions.assertEquals(1, table.getPartitionKeys().size());
-    Assertions.assertEquals("city", 
table.getPartitionKeys().getFirst().getName());
+    
+    // TODO: Revert after HIVE-29633 is fixed
+    // Assertions.assertEquals(1, table.getPartitionKeys().size());
+    Assertions.assertTrue(table.getPartitionKeys().isEmpty());
+    
+    List<String> columnNames =
+        table.getSd().getCols().stream().map(FieldSchema::getName).toList();
+    Assertions.assertTrue(columnNames.contains("city"));
     
     // --- Get Tables ---
     List<String> tables = msClient.getTables(CATALOG_NAME, DB_NAME, "ice_*");
@@ -195,7 +221,69 @@ public void testIceberg() throws Exception {
     
Assertions.assertFalse(msClient.getAllDatabases(CATALOG_NAME).contains(DB_NAME));
   }
 
-  private static Table createPartitionedTable(IMetaStoreClient db, String 
catName, String dbName, String tableName, 
+  @Test
+  public void testIcebergView() throws Exception {
+    Database db = new Database();
+    db.setCatalogName(CATALOG_NAME);
+    db.setName(VIEW_DB_NAME);
+    db.setOwnerType(PrincipalType.USER);
+    db.setOwnerName(System.getProperty("user.name"));
+    String warehouseDir = MetastoreConf.get(conf, 
MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL.getVarname());
+    db.setLocationUri(warehouseDir + "/" + VIEW_DB_NAME + ".db");
+    hive.createDatabase(db, true);
+
+    List<FieldSchema> cols = Collections.singletonList(new FieldSchema("x", 
"int", ""));
+    createIcebergView(VIEW_DB_NAME, NATIVE_VIEW_NAME, cols, "select 1 as x", 
"rest-native-view");
+
+    Assertions.assertTrue(msClient.tableExists(CATALOG_NAME, VIEW_DB_NAME, 
NATIVE_VIEW_NAME));
+    
+    GetTableRequest getTableRequest = new GetTableRequest();
+    getTableRequest.setCatName(CATALOG_NAME);
+    getTableRequest.setDbName(VIEW_DB_NAME);
+    getTableRequest.setTblName(NATIVE_VIEW_NAME);
+    Table view = msClient.getTable(getTableRequest);
+    
+    Assertions.assertEquals(TableType.VIRTUAL_VIEW.name(), 
view.getTableType());
+    String tableTypeProp = 
view.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+    Assertions.assertNotNull(tableTypeProp);
+    Assertions.assertEquals(HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE, 
tableTypeProp.toLowerCase());
+
+    List<String> names = msClient.getTables(CATALOG_NAME, VIEW_DB_NAME, "*");
+    Assertions.assertTrue(names.contains(NATIVE_VIEW_NAME));
+
+    msClient.dropTable(CATALOG_NAME, VIEW_DB_NAME, NATIVE_VIEW_NAME);
+    Assertions.assertFalse(msClient.tableExists(CATALOG_NAME, VIEW_DB_NAME, 
NATIVE_VIEW_NAME));
+
+    msClient.dropDatabase(VIEW_DB_NAME);
+  }
+
+  private void createIcebergView(
+      String dbName, String viewName, List<FieldSchema> cols, String viewSql, 
String comment) throws Exception {
+    Table view = new Table();
+    view.setCatName(CATALOG_NAME);
+    view.setDbName(dbName);
+    view.setTableName(viewName);
+    view.setTableType(TableType.VIRTUAL_VIEW.toString());
+    view.setViewOriginalText(viewSql);
+    view.setViewExpandedText(viewSql);
+
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setCols(cols);
+    sd.setSerdeInfo(new SerDeInfo());
+    sd.getSerdeInfo().setParameters(new java.util.HashMap<>());
+    view.setSd(sd);
+
+    view.setParameters(new java.util.HashMap<>());
+    view.getParameters().put(hive_metastoreConstants.META_TABLE_STORAGE, 
HIVE_ICEBERG_STORAGE_HANDLER);
+    view.getParameters().put("view-format", "iceberg");
+    if (comment != null) {
+      view.getParameters().put("comment", comment);
+    }
+
+    msClient.createTable(view);
+  }
+
+  private static Table createPartitionedTable(IMetaStoreClient db, String 
catName, String dbName, String tableName,
       Map<String, String> tableParameters) throws Exception {
     db.dropTable(catName, dbName, tableName);
     Table table = new Table();
diff --git 
a/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseCreateView.java 
b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseCreateView.java
new file mode 100644
index 00000000000..22c31f74d0b
--- /dev/null
+++ b/parser/src/test/org/apache/hadoop/hive/ql/parse/TestParseCreateView.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestParseCreateView {
+
+  private final ParseDriver parseDriver = new ParseDriver();
+
+  @Test
+  public void testParseCreateViewWithTableProperties() throws Exception {
+    ASTNode tree =
+        parseDriver
+            .parse(
+                "create view v1 tblproperties ('view-format'='iceberg') as 
select * from t", null)
+            .getTree();
+    assertTrue(tree.dump(), tree.toStringTree().contains("tok_createview"));
+    assertTrue(tree.dump(), 
tree.toStringTree().contains("tok_tableproperties"));
+    assertTrue(tree.dump(), tree.toStringTree().contains("view-format"));
+    assertTrue(tree.dump(), tree.toStringTree().contains("iceberg"));
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewAnalyzer.java
index 60b123dbb7b..38d4081f520 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewAnalyzer.java
@@ -24,7 +24,9 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -34,6 +36,7 @@
 import org.apache.hadoop.hive.ql.ddl.DDLUtils;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
@@ -41,12 +44,16 @@
 import org.apache.hadoop.hive.ql.parse.ParseUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.StorageFormat;
 
 /**
  * Analyzer for create view commands.
  */
 @DDLType(types = HiveParser.TOK_CREATEVIEW)
 public class CreateViewAnalyzer extends AbstractCreateViewAnalyzer {
+
+  private static final String VIEW_FORMAT_TABLE_PROPERTY = "view-format";
+
   public CreateViewAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
   }
@@ -75,6 +82,14 @@ public void analyzeInternal(ASTNode root) throws 
SemanticException {
     List<String> partitionColumnNames = 
children.containsKey(HiveParser.TOK_VIEWPARTCOLS) ?
         getColumnNames((ASTNode) 
children.remove(HiveParser.TOK_VIEWPARTCOLS).getChild(0)) : null;
 
+    String storageHandlerClassFromTableProps = 
getViewStorageHandlerClassFromTableProps(properties);
+    String storageHandlerClass = 
resolveViewStorageHandlerClass(storageHandlerClassFromTableProps);
+
+    if (storageHandlerClassFromTableProps != null && storageHandlerClass == 
null) {
+      throw new SemanticException(
+          
ErrorMsg.VIEW_STORAGE_HANDLER_UNSUPPORTED.format(storageHandlerClassFromTableProps));
+    }
+
     assert children.isEmpty();
 
     if (ifNotExists && orReplace) {
@@ -94,7 +109,7 @@ public void analyzeInternal(ASTNode root) throws 
SemanticException {
     List<FieldSchema> partitionColumns = 
getPartitionColumns(partitionColumnNames);
     setColumnAccessInfo(analyzer.getColumnAccessInfo());
     CreateViewDesc desc = new CreateViewDesc(fqViewName, schema, comment, 
properties, partitionColumnNames,
-        ifNotExists, orReplace, originalText, expandedText, partitionColumns);
+        ifNotExists, orReplace, originalText, expandedText, partitionColumns, 
storageHandlerClass);
     validateCreateView(desc, analyzer);
 
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 
desc)));
@@ -191,6 +206,52 @@ private List<FieldSchema> getPartitionColumns(List<String> 
partitionColumnNames)
     return partitionColumnsCopy;
   }
 
+  /**
+   * Returns the FQCN of the storage handler that should own external logical 
view metadata, or {@code null} for a
+   * classic HMS virtual view. Uses {@code storageHandlerClassFromTableProps} 
when non-null (from the 
+   * {@code view-format} table property); otherwise the Hive config {@code 
hive.default.storage.handler.class}.
+   */
+  private String resolveViewStorageHandlerClass(String 
storageHandlerClassFromTableProps)
+      throws SemanticException {
+
+    String storageHandlerClassFromConfig =
+        StringUtils.trimToNull(HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_DEFAULT_STORAGE_HANDLER));
+
+    String storageHandlerClass =
+        storageHandlerClassFromTableProps != null ? 
storageHandlerClassFromTableProps : storageHandlerClassFromConfig;
+
+    if (StringUtils.isBlank(storageHandlerClass)) {
+      return null;
+    }
+
+    try {
+      HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(conf, 
storageHandlerClass);
+
+      if (storageHandler != null && 
storageHandler.supportsExternalViewCatalog()) {
+        return storageHandlerClass;
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
+
+    return null;
+  }
+
+  private static String getViewStorageHandlerClassFromTableProps(Map<String, 
String> properties) 
+      throws SemanticException {
+    if (properties == null) {
+      return null;
+    }
+    for (Map.Entry<String, String> e : properties.entrySet()) {
+      if (e.getKey() != null
+          && VIEW_FORMAT_TABLE_PROPERTY.equalsIgnoreCase(e.getKey())
+          && StringUtils.isNotBlank(e.getValue())) {
+        return 
StorageFormat.resolveStorageHandlerClassName(e.getValue().trim());
+      }
+    }
+    return null;
+  }
+
   private void validateCreateView(CreateViewDesc desc, SemanticAnalyzer 
analyzer) throws SemanticException {
     try {
       validateTablesUsed(analyzer);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
index e71cbce773f..bcbe536bf55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -40,13 +41,14 @@ public class CreateViewDesc extends AbstractCreateViewDesc {
   private final boolean ifNotExists;
   private final boolean replace;
   private final List<FieldSchema> partitionColumns;
+  private final String storageHandlerClass;
 
   private ReplicationSpec replicationSpec = null;
   private String ownerName = null;
 
   public CreateViewDesc(String viewName, List<FieldSchema> schema, String 
comment, Map<String, String> properties,
       List<String> partitionColumnNames, boolean ifNotExists, boolean replace, 
String originalText,
-      String expandedText, List<FieldSchema> partitionColumns) {
+      String expandedText, List<FieldSchema> partitionColumns, String 
storageHandlerClass) {
     super(viewName, schema, originalText, expandedText);
     this.comment = comment;
     this.properties = properties;
@@ -54,6 +56,7 @@ public CreateViewDesc(String viewName, List<FieldSchema> 
schema, String comment,
     this.ifNotExists = ifNotExists;
     this.replace = replace;
     this.partitionColumns = partitionColumns;
+    this.storageHandlerClass = storageHandlerClass;
   }
 
   @Explain(displayName = "partition columns")
@@ -89,6 +92,19 @@ public boolean isReplace() {
     return replace;
   }
 
+  /**
+   * @return FQCN of the {@link 
org.apache.hadoop.hive.ql.metadata.HiveStorageHandler} that stores view 
metadata in an
+   *         external catalog, or {@code null} for a classic HMS-only virtual 
view.
+   */
+  @Explain(displayName = "external logical view storage handler", 
displayOnlyOnTrue = true)
+  public String getStorageHandlerClass() {
+    return storageHandlerClass;
+  }
+
+  public boolean usesStorageHandler() {
+    return !StringUtils.isBlank(storageHandlerClass);
+  }
+
   /**
    * @param replicationSpec Sets the replication spec governing this create.
    * This parameter will have meaningful values only for creates happening as 
a result of a replication.
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewOperation.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewOperation.java
index b6a41a8d4fb..fe58ff0aa64 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewOperation.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewOperation.java
@@ -82,6 +82,10 @@ public int execute() throws HiveException {
       if (desc.getProperties() != null) {
         oldview.getTTable().getParameters().putAll(desc.getProperties());
       }
+      if (!desc.usesStorageHandler()) {
+        // External view is replaced with a native Hive view
+        clearStorageHandlerProp(oldview);
+      }
       oldview.setPartCols(desc.getPartitionColumns());
 
       oldview.checkValidity(null);
@@ -105,6 +109,18 @@ public int execute() throws HiveException {
     return 0;
   }
 
+  private void clearStorageHandlerProp(Table oldview) {
+    Map<String, String> params = oldview.getParameters();
+    if (params == null) {
+      return;
+    }
+    String fqcn = 
params.get(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+    if (fqcn == null) {
+      return;
+    }
+    
params.remove(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE);
+  }
+
   private Table createViewObject() throws HiveException {
     TableName name = HiveTableName.of(desc.getViewName());
     Table view = new Table(name.getDb(), name.getTable());
@@ -129,6 +145,13 @@ private Table createViewObject() throws HiveException {
     StorageFormat storageFormat = new StorageFormat(context.getConf());
     storageFormat.fillDefaultStorageFormat(false, false);
 
+    if (desc.usesStorageHandler()) {
+      storageFormat.setStorageHandler(desc.getStorageHandlerClass());
+      view.setProperty(
+          
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
+          desc.getStorageHandlerClass().trim());
+    }
+    
     view.setInputFormatClass(storageFormat.getInputFormat());
     view.setOutputFormatClass(storageFormat.getOutputFormat());
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 9287fd75e76..787cadd256b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -547,8 +547,9 @@ public static Task<?> createViewTask(MetaData metaData, 
String dbNameToLoadIn, H
       // texts using new DB name. Currently it refers to the source database 
name.
     }
 
-    CreateViewDesc desc = new CreateViewDesc(dbDotView, table.getCols(), null, 
table.getParameters(),
-            table.getPartColNames(), false, false, viewOriginalText, 
viewExpandedText, table.getPartCols());
+    CreateViewDesc desc = new CreateViewDesc(dbDotView, table.getCols(), null, 
table.getParameters(), 
+        table.getPartColNames(), false, false, viewOriginalText, 
viewExpandedText, 
+        table.getPartCols(), null);
 
     desc.setReplicationSpec(metaData.getReplicationSpec());
     desc.setOwnerName(table.getOwner());
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/CreateExternalViewRequest.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/CreateExternalViewRequest.java
new file mode 100644
index 00000000000..88631386999
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/CreateExternalViewRequest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.metadata;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+public final class CreateExternalViewRequest implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final String databaseName;
+  private final String viewName;
+  private final List<FieldSchema> schema;
+  private final String expandedText;
+  private final Map<String, String> properties;
+  private final String comment;
+  private final boolean replace;
+  private final boolean ifNotExists;
+
+  public CreateExternalViewRequest(
+      String databaseName,
+      String viewName,
+      List<FieldSchema> schema,
+      String expandedText,
+      Map<String, String> properties,
+      String comment,
+      boolean replace,
+      boolean ifNotExists) {
+    this.databaseName = databaseName;
+    this.viewName = viewName;
+    this.schema = schema;
+    this.expandedText = expandedText;
+    this.properties = properties == null ? null : 
Collections.unmodifiableMap(properties);
+    this.comment = comment;
+    this.replace = replace;
+    this.ifNotExists = ifNotExists;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getViewName() {
+    return viewName;
+  }
+
+  public List<FieldSchema> getSchema() {
+    return schema;
+  }
+
+  public String getExpandedText() {
+    return expandedText;
+  }
+
+  public Map<String, String> getProperties() {
+    return properties;
+  }
+
+  public String getComment() {
+    return comment;
+  }
+
+  public boolean isReplace() {
+    return replace;
+  }
+
+  public boolean isIfNotExists() {
+    return ifNotExists;
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 520c52a24a8..c50f0b33b05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -1028,6 +1028,14 @@ default void setMergeTaskDeleteProperties(TableDesc 
tableDesc) {
     throw new UnsupportedOperationException("Storage handler does not support 
getting custom delete merge schema.");
   }
 
+  /**
+   * @return {@code true} if this handler may store CREATE VIEW text and 
column metadata in an external catalog
+   *         rather than only as a classic HMS virtual view.
+   */
+  default boolean supportsExternalViewCatalog() {
+    return false;
+  }
+
   default boolean supportsDefaultColumnValues(Map<String, String> tblProps) {
     return false;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
index 2472ad44ad0..7fb5269ec25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/StorageFormat.java
@@ -88,6 +88,19 @@ public String outputFormat() {
       return outputFormat;
     }
   }
+  
+  public static String resolveStorageHandlerClassName(String formatType) 
throws SemanticException {
+    if (StringUtils.isBlank(formatType)) {
+      throw new SemanticException("Format type cannot be empty");
+    }
+    for (StorageHandlerTypes type : StorageHandlerTypes.NON_DEFAULT_TYPES) {
+      if (type.name().equalsIgnoreCase(formatType.trim())) {
+        Objects.requireNonNull(type.className());
+        return 
ensureClassExists(BaseSemanticAnalyzer.unescapeSQLString(type.className()));
+      }
+    }
+    return 
ensureClassExists(BaseSemanticAnalyzer.unescapeSQLString(formatType.trim()));
+  }
 
   public StorageFormat(Configuration conf) {
     this.conf = conf;

Reply via email to