This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6100efc1a7 Core: Add remaining View APIs and support for 
InMemoryCatalog (#7880)
6100efc1a7 is described below

commit 6100efc1a764b204e5a879e0ae6d535a71301249
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Sep 29 00:24:07 2023 +0200

    Core: Add remaining View APIs and support for InMemoryCatalog (#7880)
---
 .../apache/iceberg/inmemory/InMemoryCatalog.java   |  298 +++--
 .../iceberg/view/BaseMetastoreViewCatalog.java     |  220 ++++
 .../java/org/apache/iceberg/view/BaseView.java     |   89 ++
 .../apache/iceberg/view/BaseViewOperations.java    |  220 ++++
 .../org/apache/iceberg/view/PropertiesUpdate.java  |   99 ++
 .../java/org/apache/iceberg/view/ViewMetadata.java |   12 +
 .../org/apache/iceberg/view/ViewOperations.java    |   61 +
 .../java/org/apache/iceberg/view/ViewUtil.java     |   29 +
 .../apache/iceberg/view/ViewVersionReplace.java    |  128 ++
 .../iceberg/inmemory/TestInMemoryViewCatalog.java  |   49 +
 .../org/apache/iceberg/view/TestViewMetadata.java  |   31 +
 .../iceberg/view/TestViewMetadataParser.java       |   82 ++
 .../org/apache/iceberg/view/ViewCatalogTests.java  | 1340 ++++++++++++++++++++
 .../view/ViewMetadataMultipleSQLsForDialect.json   |   63 +
 14 files changed, 2649 insertions(+), 72 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java 
b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
index 3956e9192a..566e56eec9 100644
--- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
-import org.apache.iceberg.BaseMetastoreCatalog;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -42,23 +41,30 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Objects;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.view.BaseMetastoreViewCatalog;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewUtil;
 
 /**
  * Catalog implementation that uses in-memory data-structures to store the 
namespaces and tables.
  * This class doesn't touch external resources and can be utilized to write 
unit tests without side
  * effects. It uses {@link InMemoryFileIO}.
  */
-public class InMemoryCatalog extends BaseMetastoreCatalog implements 
SupportsNamespaces, Closeable {
+public class InMemoryCatalog extends BaseMetastoreViewCatalog
+    implements SupportsNamespaces, Closeable {
   private static final Joiner SLASH = Joiner.on("/");
   private static final Joiner DOT = Joiner.on(".");
 
   private final ConcurrentMap<Namespace, Map<String, String>> namespaces;
   private final ConcurrentMap<TableIdentifier, String> tables;
+  private final ConcurrentMap<TableIdentifier, String> views;
   private FileIO io;
   private String catalogName;
   private String warehouseLocation;
@@ -66,6 +72,7 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
   public InMemoryCatalog() {
     this.namespaces = Maps.newConcurrentMap();
     this.tables = Maps.newConcurrentMap();
+    this.views = Maps.newConcurrentMap();
   }
 
   @Override
@@ -111,8 +118,10 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
       lastMetadata = null;
     }
 
-    if (null == tables.remove(tableIdentifier)) {
-      return false;
+    synchronized (this) {
+      if (null == tables.remove(tableIdentifier)) {
+        return false;
+      }
     }
 
     if (purge && lastMetadata != null) {
@@ -136,27 +145,29 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
   }
 
   @Override
-  public synchronized void renameTable(TableIdentifier from, TableIdentifier 
to) {
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
     if (from.equals(to)) {
       return;
     }
 
-    if (!namespaceExists(to.namespace())) {
-      throw new NoSuchNamespaceException(
-          "Cannot rename %s to %s. Namespace does not exist: %s", from, to, 
to.namespace());
-    }
+    synchronized (this) {
+      if (!namespaceExists(to.namespace())) {
+        throw new NoSuchNamespaceException(
+            "Cannot rename %s to %s. Namespace does not exist: %s", from, to, 
to.namespace());
+      }
 
-    String fromLocation = tables.get(from);
-    if (null == fromLocation) {
-      throw new NoSuchTableException("Cannot rename %s to %s. Table does not 
exist", from, to);
-    }
+      String fromLocation = tables.get(from);
+      if (null == fromLocation) {
+        throw new NoSuchTableException("Cannot rename %s to %s. Table does not 
exist", from, to);
+      }
 
-    if (tables.containsKey(to)) {
-      throw new AlreadyExistsException("Cannot rename %s to %s. Table already 
exists", from, to);
-    }
+      if (tables.containsKey(to)) {
+        throw new AlreadyExistsException("Cannot rename %s to %s. Table 
already exists", from, to);
+      }
 
-    tables.put(to, fromLocation);
-    tables.remove(from);
+      tables.put(to, fromLocation);
+      tables.remove(from);
+    }
   }
 
   @Override
@@ -166,12 +177,14 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
 
   @Override
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
-    if (namespaceExists(namespace)) {
-      throw new AlreadyExistsException(
-          "Cannot create namespace %s. Namespace already exists", namespace);
-    }
+    synchronized (this) {
+      if (namespaceExists(namespace)) {
+        throw new AlreadyExistsException(
+            "Cannot create namespace %s. Namespace already exists", namespace);
+      }
 
-    namespaces.put(namespace, ImmutableMap.copyOf(metadata));
+      namespaces.put(namespace, ImmutableMap.copyOf(metadata));
+    }
   }
 
   @Override
@@ -181,50 +194,59 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
 
   @Override
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
-    if (!namespaceExists(namespace)) {
-      return false;
-    }
+    synchronized (this) {
+      if (!namespaceExists(namespace)) {
+        return false;
+      }
 
-    List<TableIdentifier> tableIdentifiers = listTables(namespace);
-    if (!tableIdentifiers.isEmpty()) {
-      throw new NamespaceNotEmptyException(
-          "Namespace %s is not empty. Contains %d table(s).", namespace, 
tableIdentifiers.size());
-    }
+      List<TableIdentifier> tableIdentifiers = listTables(namespace);
+      if (!tableIdentifiers.isEmpty()) {
+        throw new NamespaceNotEmptyException(
+            "Namespace %s is not empty. Contains %d table(s).", namespace, 
tableIdentifiers.size());
+      }
 
-    return namespaces.remove(namespace) != null;
+      return namespaces.remove(namespace) != null;
+    }
   }
 
   @Override
   public boolean setProperties(Namespace namespace, Map<String, String> 
properties)
       throws NoSuchNamespaceException {
-    if (!namespaceExists(namespace)) {
-      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
-    }
+    synchronized (this) {
+      if (!namespaceExists(namespace)) {
+        throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+      }
 
-    namespaces.computeIfPresent(
-        namespace,
-        (k, v) ->
-            ImmutableMap.<String, 
String>builder().putAll(v).putAll(properties).buildKeepingLast());
+      namespaces.computeIfPresent(
+          namespace,
+          (k, v) ->
+              ImmutableMap.<String, String>builder()
+                  .putAll(v)
+                  .putAll(properties)
+                  .buildKeepingLast());
 
-    return true;
+      return true;
+    }
   }
 
   @Override
   public boolean removeProperties(Namespace namespace, Set<String> properties)
       throws NoSuchNamespaceException {
-    if (!namespaceExists(namespace)) {
-      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
-    }
+    synchronized (this) {
+      if (!namespaceExists(namespace)) {
+        throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+      }
 
-    namespaces.computeIfPresent(
-        namespace,
-        (k, v) -> {
-          Map<String, String> newProperties = Maps.newHashMap(v);
-          properties.forEach(newProperties::remove);
-          return ImmutableMap.copyOf(newProperties);
-        });
+      namespaces.computeIfPresent(
+          namespace,
+          (k, v) -> {
+            Map<String, String> newProperties = Maps.newHashMap(v);
+            properties.forEach(newProperties::remove);
+            return ImmutableMap.copyOf(newProperties);
+          });
 
-    return true;
+      return true;
+    }
   }
 
   @Override
@@ -278,15 +300,73 @@ public class InMemoryCatalog extends BaseMetastoreCatalog 
implements SupportsNam
   public void close() throws IOException {
     namespaces.clear();
     tables.clear();
+    views.clear();
+  }
+
+  @Override
+  public List<TableIdentifier> listViews(Namespace namespace) {
+    if (!namespaceExists(namespace) && !namespace.isEmpty()) {
+      throw new NoSuchNamespaceException(
+          "Cannot list views for namespace. Namespace does not exist: %s", 
namespace);
+    }
+
+    return views.keySet().stream()
+        .filter(v -> namespace.isEmpty() || v.namespace().equals(namespace))
+        .sorted(Comparator.comparing(TableIdentifier::toString))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
+    return new InMemoryViewOperations(io, identifier);
+  }
+
+  @Override
+  public boolean dropView(TableIdentifier identifier) {
+    synchronized (this) {
+      return null != views.remove(identifier);
+    }
+  }
+
+  @Override
+  public void renameView(TableIdentifier from, TableIdentifier to) {
+    if (from.equals(to)) {
+      return;
+    }
+
+    synchronized (this) {
+      if (!namespaceExists(to.namespace())) {
+        throw new NoSuchNamespaceException(
+            "Cannot rename %s to %s. Namespace does not exist: %s", from, to, 
to.namespace());
+      }
+
+      String fromViewLocation = views.get(from);
+      if (null == fromViewLocation) {
+        throw new NoSuchViewException("Cannot rename %s to %s. View does not 
exist", from, to);
+      }
+
+      if (tables.containsKey(to)) {
+        throw new AlreadyExistsException("Cannot rename %s to %s. Table 
already exists", from, to);
+      }
+
+      if (views.containsKey(to)) {
+        throw new AlreadyExistsException("Cannot rename %s to %s. View already 
exists", from, to);
+      }
+
+      views.put(to, fromViewLocation);
+      views.remove(from);
+    }
   }
 
   private class InMemoryTableOperations extends BaseMetastoreTableOperations {
     private final FileIO fileIO;
     private final TableIdentifier tableIdentifier;
+    private final String fullTableName;
 
     InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
       this.fileIO = fileIO;
       this.tableIdentifier = tableIdentifier;
+      this.fullTableName = fullTableName(catalogName, tableIdentifier);
     }
 
     @Override
@@ -301,40 +381,114 @@ public class InMemoryCatalog extends 
BaseMetastoreCatalog implements SupportsNam
 
     @Override
     public void doCommit(TableMetadata base, TableMetadata metadata) {
-      String newLocation = writeNewMetadata(metadata, currentVersion() + 1);
+      String newLocation = writeNewMetadataIfRequired(base == null, metadata);
       String oldLocation = base == null ? null : base.metadataFileLocation();
 
-      if (null == base && !namespaceExists(tableIdentifier.namespace())) {
-        throw new NoSuchNamespaceException(
-            "Cannot create table %s. Namespace does not exist: %s",
-            tableIdentifier, tableIdentifier.namespace());
+      synchronized (InMemoryCatalog.this) {
+        if (null == base && !namespaceExists(tableIdentifier.namespace())) {
+          throw new NoSuchNamespaceException(
+              "Cannot create table %s. Namespace does not exist: %s",
+              tableIdentifier, tableIdentifier.namespace());
+        }
+
+        if (views.containsKey(tableIdentifier)) {
+          throw new AlreadyExistsException(
+              "View with same name already exists: %s", tableIdentifier);
+        }
+
+        tables.compute(
+            tableIdentifier,
+            (k, existingLocation) -> {
+              if (!Objects.equal(existingLocation, oldLocation)) {
+                if (null == base) {
+                  throw new AlreadyExistsException("Table already exists: %s", 
tableName());
+                }
+
+                throw new CommitFailedException(
+                    "Cannot commit to table %s metadata location from %s to %s 
"
+                        + "because it has been concurrently modified to %s",
+                    tableIdentifier, oldLocation, newLocation, 
existingLocation);
+              }
+              return newLocation;
+            });
+      }
+    }
+
+    @Override
+    public FileIO io() {
+      return fileIO;
+    }
+
+    @Override
+    protected String tableName() {
+      return fullTableName;
+    }
+  }
+
+  private class InMemoryViewOperations extends BaseViewOperations {
+    private final FileIO io;
+    private final TableIdentifier identifier;
+    private final String fullViewName;
+
+    InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
+      this.io = io;
+      this.identifier = identifier;
+      this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
+    }
+
+    @Override
+    public void doRefresh() {
+      String latestLocation = views.get(identifier);
+      if (latestLocation == null) {
+        disableRefresh();
+      } else {
+        refreshFromMetadataLocation(latestLocation);
       }
+    }
 
-      tables.compute(
-          tableIdentifier,
-          (k, existingLocation) -> {
-            if (!Objects.equal(existingLocation, oldLocation)) {
-              if (null == base) {
-                throw new AlreadyExistsException("Table already exists: %s", 
tableName());
+    @Override
+    public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+      String newLocation = writeNewMetadataIfRequired(metadata);
+      String oldLocation = base == null ? null : currentMetadataLocation();
+
+      synchronized (InMemoryCatalog.this) {
+        if (null == base && !namespaceExists(identifier.namespace())) {
+          throw new NoSuchNamespaceException(
+              "Cannot create view %s. Namespace does not exist: %s",
+              identifier, identifier.namespace());
+        }
+
+        if (tables.containsKey(identifier)) {
+          throw new AlreadyExistsException("Table with same name already 
exists: %s", identifier);
+        }
+
+        views.compute(
+            identifier,
+            (k, existingLocation) -> {
+              if (!Objects.equal(existingLocation, oldLocation)) {
+                if (null == base) {
+                  throw new AlreadyExistsException("View already exists: %s", 
identifier);
+                }
+
+                throw new CommitFailedException(
+                    "Cannot commit to view %s metadata location from %s to %s "
+                        + "because it has been concurrently modified to %s",
+                    identifier, oldLocation, newLocation, existingLocation);
               }
 
-              throw new CommitFailedException(
-                  "Cannot commit to table %s metadata location from %s to %s "
-                      + "because it has been concurrently modified to %s",
-                  tableIdentifier, oldLocation, newLocation, existingLocation);
-            }
-            return newLocation;
-          });
+              return newLocation;
+            });
+      }
     }
 
     @Override
     public FileIO io() {
-      return fileIO;
+      return io;
     }
 
     @Override
-    protected String tableName() {
-      return tableIdentifier.toString();
+    protected String viewName() {
+      return fullViewName;
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java 
b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
new file mode 100644
index 0000000000..42eb80a047
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog 
implements ViewCatalog {
+  protected abstract ViewOperations newViewOps(TableIdentifier identifier);
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    super.initialize(name, properties);
+  }
+
+  @Override
+  public String name() {
+    return super.name();
+  }
+
+  @Override
+  public View loadView(TableIdentifier identifier) {
+    if (isValidIdentifier(identifier)) {
+      ViewOperations ops = newViewOps(identifier);
+      if (ops.current() == null) {
+        throw new NoSuchViewException("View does not exist: %s", identifier);
+      } else {
+        return new BaseView(newViewOps(identifier), 
ViewUtil.fullViewName(name(), identifier));
+      }
+    }
+
+    throw new NoSuchViewException("Invalid view identifier: %s", identifier);
+  }
+
+  @Override
+  public ViewBuilder buildView(TableIdentifier identifier) {
+    return new BaseViewBuilder(identifier);
+  }
+
+  protected class BaseViewBuilder implements ViewBuilder {
+    private final TableIdentifier identifier;
+    private final Map<String, String> properties = Maps.newHashMap();
+    private final List<ViewRepresentation> representations = 
Lists.newArrayList();
+    private Namespace defaultNamespace = null;
+    private String defaultCatalog = null;
+    private Schema schema = null;
+
+    protected BaseViewBuilder(TableIdentifier identifier) {
+      Preconditions.checkArgument(
+          isValidIdentifier(identifier), "Invalid view identifier: %s", 
identifier);
+      this.identifier = identifier;
+    }
+
+    @Override
+    public ViewBuilder withSchema(Schema newSchema) {
+      this.schema = newSchema;
+      return this;
+    }
+
+    @Override
+    public ViewBuilder withQuery(String dialect, String sql) {
+      representations.add(
+          
ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build());
+      return this;
+    }
+
+    @Override
+    public ViewBuilder withDefaultCatalog(String catalog) {
+      this.defaultCatalog = catalog;
+      return this;
+    }
+
+    @Override
+    public ViewBuilder withDefaultNamespace(Namespace namespace) {
+      this.defaultNamespace = namespace;
+      return this;
+    }
+
+    @Override
+    public ViewBuilder withProperties(Map<String, String> newProperties) {
+      this.properties.putAll(newProperties);
+      return this;
+    }
+
+    @Override
+    public ViewBuilder withProperty(String key, String value) {
+      this.properties.put(key, value);
+      return this;
+    }
+
+    @Override
+    public View create() {
+      return create(newViewOps(identifier));
+    }
+
+    @Override
+    public View replace() {
+      return replace(newViewOps(identifier));
+    }
+
+    @Override
+    public View createOrReplace() {
+      ViewOperations ops = newViewOps(identifier);
+      if (null == ops.current()) {
+        return create(ops);
+      } else {
+        return replace(ops);
+      }
+    }
+
+    private View create(ViewOperations ops) {
+      if (null != ops.current()) {
+        throw new AlreadyExistsException("View already exists: %s", 
identifier);
+      }
+
+      Preconditions.checkState(
+          !representations.isEmpty(), "Cannot create view without specifying a 
query");
+      Preconditions.checkState(null != schema, "Cannot create view without 
specifying schema");
+      Preconditions.checkState(
+          null != defaultNamespace, "Cannot create view without specifying a 
default namespace");
+
+      ViewVersion viewVersion =
+          ImmutableViewVersion.builder()
+              .versionId(1)
+              .schemaId(schema.schemaId())
+              .addAllRepresentations(representations)
+              .defaultNamespace(defaultNamespace)
+              .defaultCatalog(defaultCatalog)
+              .timestampMillis(System.currentTimeMillis())
+              .putSummary("operation", "create")
+              .build();
+
+      ViewMetadata viewMetadata =
+          ViewMetadata.builder()
+              .setProperties(properties)
+              .setLocation(defaultWarehouseLocation(identifier))
+              .setCurrentVersion(viewVersion, schema)
+              .build();
+
+      try {
+        ops.commit(null, viewMetadata);
+      } catch (CommitFailedException ignored) {
+        throw new AlreadyExistsException("View was created concurrently: %s", 
identifier);
+      }
+
+      return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
+    }
+
+    private View replace(ViewOperations ops) {
+      if (null == ops.current()) {
+        throw new NoSuchViewException("View does not exist: %s", identifier);
+      }
+
+      Preconditions.checkState(
+          !representations.isEmpty(), "Cannot replace view without specifying 
a query");
+      Preconditions.checkState(null != schema, "Cannot replace view without 
specifying schema");
+      Preconditions.checkState(
+          null != defaultNamespace, "Cannot replace view without specifying a 
default namespace");
+
+      ViewMetadata metadata = ops.current();
+      int maxVersionId =
+          metadata.versions().stream()
+              .map(ViewVersion::versionId)
+              .max(Integer::compareTo)
+              .orElseGet(metadata::currentVersionId);
+
+      ViewVersion viewVersion =
+          ImmutableViewVersion.builder()
+              .versionId(maxVersionId + 1)
+              .schemaId(schema.schemaId())
+              .addAllRepresentations(representations)
+              .defaultNamespace(defaultNamespace)
+              .defaultCatalog(defaultCatalog)
+              .timestampMillis(System.currentTimeMillis())
+              .putSummary("operation", "replace")
+              .build();
+
+      ViewMetadata replacement =
+          ViewMetadata.buildFrom(metadata)
+              .setProperties(properties)
+              .setCurrentVersion(viewVersion, schema)
+              .build();
+
+      try {
+        ops.commit(metadata, replacement);
+      } catch (CommitFailedException ignored) {
+        throw new AlreadyExistsException("View was updated concurrently: %s", 
identifier);
+      }
+
+      return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseView.java 
b/core/src/main/java/org/apache/iceberg/view/BaseView.java
new file mode 100644
index 0000000000..a21bc2381f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseView.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+
+public class BaseView implements View, Serializable {
+
+  private final ViewOperations ops;
+  private final String name;
+
+  public BaseView(ViewOperations ops, String name) {
+    this.ops = ops;
+    this.name = name;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  public ViewOperations operations() {
+    return ops;
+  }
+
+  @Override
+  public Schema schema() {
+    return operations().current().schema();
+  }
+
+  @Override
+  public Map<Integer, Schema> schemas() {
+    return operations().current().schemasById();
+  }
+
+  @Override
+  public ViewVersion currentVersion() {
+    return operations().current().currentVersion();
+  }
+
+  @Override
+  public Iterable<ViewVersion> versions() {
+    return operations().current().versions();
+  }
+
+  @Override
+  public ViewVersion version(int versionId) {
+    return operations().current().version(versionId);
+  }
+
+  @Override
+  public List<ViewHistoryEntry> history() {
+    return operations().current().history();
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return operations().current().properties();
+  }
+
+  @Override
+  public UpdateViewProperties updateProperties() {
+    return new PropertiesUpdate(ops);
+  }
+
+  @Override
+  public ReplaceViewVersion replaceVersion() {
+    return new ViewVersionReplace(ops);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java 
b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
new file mode 100644
index 0000000000..f7270b9a35
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseViewOperations implements ViewOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseViewOperations.class);
+
+  private static final String METADATA_FOLDER_NAME = "metadata";
+
+  private ViewMetadata currentMetadata = null;
+  private String currentMetadataLocation = null;
+  private boolean shouldRefresh = true;
+  private int version = -1;
+
+  protected BaseViewOperations() {}
+
+  protected void requestRefresh() {
+    this.shouldRefresh = true;
+  }
+
+  protected void disableRefresh() {
+    this.shouldRefresh = false;
+  }
+
+  protected abstract void doRefresh();
+
+  protected abstract void doCommit(ViewMetadata base, ViewMetadata metadata);
+
+  protected abstract String viewName();
+
+  protected abstract FileIO io();
+
+  protected String currentMetadataLocation() {
+    return currentMetadataLocation;
+  }
+
+  protected int currentVersion() {
+    return version;
+  }
+
+  @Override
+  public ViewMetadata current() {
+    if (shouldRefresh) {
+      return refresh();
+    }
+
+    return currentMetadata;
+  }
+
+  @Override
+  public ViewMetadata refresh() {
+    boolean currentMetadataWasAvailable = currentMetadata != null;
+    try {
+      doRefresh();
+    } catch (NoSuchViewException e) {
+      if (currentMetadataWasAvailable) {
+        LOG.warn("Could not find the view during refresh, setting current 
metadata to null", e);
+        shouldRefresh = true;
+      }
+
+      currentMetadata = null;
+      currentMetadataLocation = null;
+      version = -1;
+      throw e;
+    }
+
+    return current();
+  }
+
+  @Override
+  public void commit(ViewMetadata base, ViewMetadata metadata) {
+    // if the metadata is already out of date, reject it
+    if (base != current()) {
+      if (base != null) {
+        throw new CommitFailedException("Cannot commit: stale view metadata");
+      } else {
+        // when current is non-null, the view exists. but when base is null, 
the commit is trying
+        // to create the view
+        throw new AlreadyExistsException("View already exists: %s", 
viewName());
+      }
+    }
+
+    // if the metadata is not changed, return early
+    if (base == metadata) {
+      LOG.info("Nothing to commit.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+    doCommit(base, metadata);
+    requestRefresh();
+
+    LOG.info(
+        "Successfully committed to view {} in {} ms",
+        viewName(),
+        System.currentTimeMillis() - start);
+  }
+
+  private String writeNewMetadata(ViewMetadata metadata, int newVersion) {
+    String newMetadataFilePath = newMetadataFilePath(metadata, newVersion);
+    OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath);
+
+    // write the new metadata
+    // use overwrite to avoid negative caching in S3. this is safe because the 
metadata location is
+    // always unique because it includes a UUID.
+    ViewMetadataParser.overwrite(metadata, newMetadataLocation);
+
+    return newMetadataLocation.location();
+  }
+
+  protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
+    return null != metadata.metadataFileLocation()
+        ? metadata.metadataFileLocation()
+        : writeNewMetadata(metadata, version + 1);
+  }
+
+  private String newMetadataFilePath(ViewMetadata metadata, int newVersion) {
+    return metadataFileLocation(
+        metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), 
".metadata.json"));
+  }
+
+  private String metadataFileLocation(ViewMetadata metadata, String filename) {
+    return String.format(
+        "%s/%s/%s",
+        LocationUtil.stripTrailingSlash(metadata.location()), 
METADATA_FOLDER_NAME, filename);
+  }
+
+  protected void refreshFromMetadataLocation(String newLocation) {
+    refreshFromMetadataLocation(newLocation, null, 20);
+  }
+
+  protected void refreshFromMetadataLocation(
+      String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
+    refreshFromMetadataLocation(
+        newLocation,
+        shouldRetry,
+        numRetries,
+        metadataLocation -> 
ViewMetadataParser.read(io().newInputFile(metadataLocation)));
+  }
+
+  protected void refreshFromMetadataLocation(
+      String newLocation,
+      Predicate<Exception> shouldRetry,
+      int numRetries,
+      Function<String, ViewMetadata> metadataLoader) {
+    if (!Objects.equal(currentMetadataLocation, newLocation)) {
+      LOG.info("Refreshing view metadata from new version: {}", newLocation);
+
+      AtomicReference<ViewMetadata> newMetadata = new AtomicReference<>();
+      Tasks.foreach(newLocation)
+          .retry(numRetries)
+          .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
+          .throwFailureWhenFinished()
+          .stopRetryOn(NotFoundException.class) // overridden if shouldRetry 
is non-null
+          .shouldRetryTest(shouldRetry)
+          .run(metadataLocation -> 
newMetadata.set(metadataLoader.apply(metadataLocation)));
+
+      this.currentMetadata = newMetadata.get();
+      this.currentMetadataLocation = newLocation;
+      this.version = parseVersion(newLocation);
+    }
+
+    this.shouldRefresh = false;
+  }
+
+  /**
+   * Parse the version from view metadata file name.
+   *
+   * @param metadataLocation view metadata file location
+   * @return version of the view metadata file in success case and -1 if the 
version is not parsable
+   *     (as a sign that the metadata is not part of this catalog)
+   */
+  private static int parseVersion(String metadataLocation) {
+    int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't 
found, this will be 0
+    int versionEnd = metadataLocation.indexOf('-', versionStart);
+    if (versionEnd < 0) {
+      // found filesystem view's metadata
+      return -1;
+    }
+
+    try {
+      return Integer.valueOf(metadataLocation.substring(versionStart, 
versionEnd));
+    } catch (NumberFormatException e) {
+      LOG.warn("Unable to parse version from metadata location: {}", 
metadataLocation, e);
+      return -1;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java 
b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java
new file mode 100644
index 0000000000..48bcfc3a68
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class PropertiesUpdate implements UpdateViewProperties {
+  private final ViewOperations ops;
+  private final Map<String, String> updates = Maps.newHashMap();
+  private final Set<String> removals = Sets.newHashSet();
+  private ViewMetadata base;
+
+  PropertiesUpdate(ViewOperations ops) {
+    this.ops = ops;
+    this.base = ops.current();
+  }
+
+  @Override
+  public Map<String, String> apply() {
+    return internalApply().properties();
+  }
+
+  private ViewMetadata internalApply() {
+    this.base = ops.refresh();
+
+    return 
ViewMetadata.buildFrom(base).setProperties(updates).removeProperties(removals).build();
+  }
+
+  @Override
+  public void commit() {
+    Tasks.foreach(ops)
+        .retry(
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */)
+        .onlyRetryOn(CommitFailedException.class)
+        .run(taskOps -> taskOps.commit(base, internalApply()));
+  }
+
+  @Override
+  public UpdateViewProperties set(String key, String value) {
+    Preconditions.checkArgument(null != key, "Invalid key: null");
+    Preconditions.checkArgument(null != value, "Invalid value: null");
+    Preconditions.checkArgument(
+        !removals.contains(key), "Cannot remove and update the same key: %s", 
key);
+
+    updates.put(key, value);
+    return this;
+  }
+
+  @Override
+  public UpdateViewProperties remove(String key) {
+    Preconditions.checkArgument(null != key, "Invalid key: null");
+    Preconditions.checkArgument(
+        !updates.containsKey(key), "Cannot remove and update the same key: 
%s", key);
+
+    removals.add(key);
+    return this;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java 
b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
index 43dba2f6b2..68619ddb22 100644
--- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
@@ -35,6 +35,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.PropertyUtil;
 import org.immutables.value.Value;
 import org.immutables.value.Value.Style.ImplementationVisibility;
@@ -272,6 +273,17 @@ public interface ViewMetadata extends Serializable {
           "Cannot add version with unknown schema: %s",
           version.schemaId());
 
+      Set<String> dialects = Sets.newHashSet();
+      for (ViewRepresentation repr : version.representations()) {
+        if (repr instanceof SQLViewRepresentation) {
+          SQLViewRepresentation sql = (SQLViewRepresentation) repr;
+          Preconditions.checkArgument(
+              dialects.add(sql.dialect()),
+              "Invalid view version: Cannot add multiple queries for dialect 
%s",
+              sql.dialect());
+        }
+      }
+
       ViewVersion newVersion;
       if (newVersionId != version.versionId()) {
         newVersion = 
ImmutableViewVersion.builder().from(version).versionId(newVersionId).build();
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewOperations.java 
b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java
new file mode 100644
index 0000000000..37133161b9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+/** SPI interface to abstract view metadata access and updates. */
+public interface ViewOperations {
+
+  /**
+   * Return the currently loaded view metadata, without checking for updates.
+   *
+   * @return view metadata
+   */
+  ViewMetadata current();
+
+  /**
+   * Return the current view metadata after checking for updates.
+   *
+   * @return view metadata
+   */
+  ViewMetadata refresh();
+
+  /**
+   * Replace the base view metadata with a new version.
+   *
+   * <p>This method should implement and document atomicity guarantees.
+   *
+   * <p>Implementations must check that the base metadata is current to avoid 
overwriting updates.
+   * Once the atomic commit operation succeeds, implementations must not 
perform any operations that
+   * may fail because failure in this method cannot be distinguished from 
commit failure.
+   *
+   * <p>Implementations should throw a {@link
+   * org.apache.iceberg.exceptions.CommitStateUnknownException} in cases where 
it cannot be
+   * determined if the commit succeeded or failed. For example if a network 
partition causes the
+   * confirmation of the commit to be lost, the implementation should throw a
+   * CommitStateUnknownException. An unknown state indicates to downstream 
users of this API that it
+   * is not safe to perform clean up and remove any files. In general, strict 
metadata cleanup will
+   * only trigger cleanups when the commit fails with an exception 
implementing the marker interface
+   * {@link org.apache.iceberg.exceptions.CleanableFailure}. All other 
exceptions will be treated as
+   * if the commit has failed.
+   *
+   * @param base view metadata on which changes were based
+   * @param metadata new view metadata with updates
+   */
+  void commit(ViewMetadata base, ViewMetadata metadata);
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewUtil.java 
b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java
new file mode 100644
index 0000000000..b79e2c3ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+public class ViewUtil {
+  private ViewUtil() {}
+
+  public static String fullViewName(String catalog, TableIdentifier ident) {
+    return catalog + "." + ident;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java 
b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
new file mode 100644
index 0000000000..15f05c531d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class ViewVersionReplace implements ReplaceViewVersion {
+  private final ViewOperations ops;
+  private final List<ViewRepresentation> representations = 
Lists.newArrayList();
+  private ViewMetadata base;
+  private Namespace defaultNamespace = null;
+  private String defaultCatalog = null;
+  private Schema schema = null;
+
+  ViewVersionReplace(ViewOperations ops) {
+    this.ops = ops;
+    this.base = ops.current();
+  }
+
+  @Override
+  public ViewVersion apply() {
+    return internalApply().currentVersion();
+  }
+
+  private ViewMetadata internalApply() {
+    Preconditions.checkState(
+        !representations.isEmpty(), "Cannot replace view without specifying a 
query");
+    Preconditions.checkState(null != schema, "Cannot replace view without 
specifying schema");
+    Preconditions.checkState(
+        null != defaultNamespace, "Cannot replace view without specifying a 
default namespace");
+
+    this.base = ops.refresh();
+
+    ViewVersion viewVersion = base.currentVersion();
+    int maxVersionId =
+        base.versions().stream()
+            .map(ViewVersion::versionId)
+            .max(Integer::compareTo)
+            .orElseGet(viewVersion::versionId);
+
+    ViewVersion newVersion =
+        ImmutableViewVersion.builder()
+            .versionId(maxVersionId + 1)
+            .timestampMillis(System.currentTimeMillis())
+            .schemaId(schema.schemaId())
+            .defaultNamespace(defaultNamespace)
+            .defaultCatalog(defaultCatalog)
+            .putSummary("operation", "replace")
+            .addAllRepresentations(representations)
+            .build();
+
+    return ViewMetadata.buildFrom(base).setCurrentVersion(newVersion, 
schema).build();
+  }
+
+  @Override
+  public void commit() {
+    Tasks.foreach(ops)
+        .retry(
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_NUM_RETRIES, 
COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_MIN_RETRY_WAIT_MS, 
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_MAX_RETRY_WAIT_MS, 
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(
+                base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, 
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */)
+        .onlyRetryOn(CommitFailedException.class)
+        .run(taskOps -> taskOps.commit(base, internalApply()));
+  }
+
+  @Override
+  public ReplaceViewVersion withQuery(String dialect, String sql) {
+    
representations.add(ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build());
+    return this;
+  }
+
+  @Override
+  public ReplaceViewVersion withSchema(Schema newSchema) {
+    this.schema = newSchema;
+    return this;
+  }
+
+  @Override
+  public ReplaceViewVersion withDefaultCatalog(String catalog) {
+    this.defaultCatalog = catalog;
+    return this;
+  }
+
+  @Override
+  public ReplaceViewVersion withDefaultNamespace(Namespace namespace) {
+    this.defaultNamespace = namespace;
+    return this;
+  }
+}
diff --git 
a/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java 
b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java
new file mode 100644
index 0000000000..76731f58a6
--- /dev/null
+++ 
b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.inmemory;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.view.ViewCatalogTests;
+import org.junit.jupiter.api.BeforeEach;
+
+public class TestInMemoryViewCatalog extends ViewCatalogTests<InMemoryCatalog> 
{
+  private InMemoryCatalog catalog;
+
+  @BeforeEach
+  public void before() {
+    this.catalog = new InMemoryCatalog();
+    this.catalog.initialize("in-memory-catalog", ImmutableMap.of());
+  }
+
+  @Override
+  protected InMemoryCatalog catalog() {
+    return catalog;
+  }
+
+  @Override
+  protected Catalog tableCatalog() {
+    return catalog;
+  }
+
+  @Override
+  protected boolean requiresNamespaceCreate() {
+    return true;
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java 
b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
index 4706640f53..7837d94055 100644
--- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
@@ -727,4 +727,35 @@ public class TestViewMetadata {
         .containsExactly(schemaOne.asStruct(), schemaTwo.asStruct(), 
schemaThree.asStruct());
     assertThat(viewMetadata.schemasById().keySet()).containsExactly(0, 1, 2);
   }
+
+  @Test
+  public void viewMetadataWithMultipleSQLForSameDialect() {
+    assertThatThrownBy(
+            () ->
+                ViewMetadata.builder()
+                    .setLocation("custom-location")
+                    .addSchema(new Schema(Types.NestedField.required(1, "x", 
Types.LongType.get())))
+                    .addVersion(
+                        ImmutableViewVersion.builder()
+                            .schemaId(0)
+                            .versionId(1)
+                            .timestampMillis(23L)
+                            .putSummary("operation", "create")
+                            .defaultNamespace(Namespace.of("ns"))
+                            .addRepresentations(
+                                ImmutableSQLViewRepresentation.builder()
+                                    .dialect("spark")
+                                    .sql("select * from ns.tbl")
+                                    .build())
+                            .addRepresentations(
+                                ImmutableSQLViewRepresentation.builder()
+                                    .dialect("spark")
+                                    .sql("select * from ns.tbl2")
+                                    .build())
+                            .build())
+                    .setCurrentVersionId(1)
+                    .build())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid view version: Cannot add multiple queries for 
dialect spark");
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java 
b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
index 5fb9589522..267195c133 100644
--- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
+++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.Test;
@@ -226,4 +227,85 @@ public class TestViewMetadataParser {
         .isEqualTo(expectedViewMetadata);
     assertThat(actual.metadataFileLocation()).isEqualTo(metadataLocation);
   }
+
+  @Test
+  public void viewMetadataWithMultipleSQLsForDialectShouldBeReadable() throws 
Exception {
+    ViewVersion viewVersion =
+        ImmutableViewVersion.builder()
+            .versionId(1)
+            .timestampMillis(4353L)
+            .summary(ImmutableMap.of("operation", "create"))
+            .schemaId(0)
+            .defaultCatalog("some-catalog")
+            .defaultNamespace(Namespace.empty())
+            .addRepresentations(
+                ImmutableSQLViewRepresentation.builder()
+                    .sql("select 'foo' foo")
+                    .dialect("spark-sql")
+                    .build())
+            .addRepresentations(
+                ImmutableSQLViewRepresentation.builder()
+                    .sql("select * from foo")
+                    .dialect("spark-sql")
+                    .build())
+            .build();
+
+    String json =
+        readViewMetadataInputFile(
+            "org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json");
+
+    // builder will throw an exception due to having multiple SQLs for the 
same dialect, thus
+    // construct the expected view metadata directly
+    ViewMetadata expectedViewMetadata =
+        ImmutableViewMetadata.of(
+            "fa6506c3-7681-40c8-86dc-e36561f83385",
+            1,
+            "s3://bucket/test/location",
+            ImmutableList.of(TEST_SCHEMA),
+            1,
+            ImmutableList.of(viewVersion),
+            ImmutableList.of(
+                
ImmutableViewHistoryEntry.builder().versionId(1).timestampMillis(4353).build()),
+            ImmutableMap.of("some-key", "some-value"),
+            ImmutableList.of(),
+            null);
+
+    // reading view metadata with multiple SQLs for the same dialects 
shouldn't fail
+    ViewMetadata actual = ViewMetadataParser.fromJson(json);
+    assertThat(actual)
+        .usingRecursiveComparison()
+        .ignoringFieldsOfTypes(Schema.class)
+        .isEqualTo(expectedViewMetadata);
+  }
+
+  @Test
+  public void replaceViewMetadataWithMultipleSQLsForDialect() throws Exception 
{
+    String json =
+        readViewMetadataInputFile(
+            "org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json");
+
+    // reading view metadata with multiple SQLs for the same dialects 
shouldn't fail
+    ViewMetadata invalid = ViewMetadataParser.fromJson(json);
+
+    // replace metadata with a new view version that fixes the SQL 
representations
+    ViewVersion viewVersion =
+        ImmutableViewVersion.builder()
+            .versionId(2)
+            .schemaId(0)
+            .timestampMillis(5555L)
+            .summary(ImmutableMap.of("operation", "replace"))
+            .defaultCatalog("some-catalog")
+            .defaultNamespace(Namespace.empty())
+            .addRepresentations(
+                ImmutableSQLViewRepresentation.builder()
+                    .sql("select * from foo")
+                    .dialect("spark-sql")
+                    .build())
+            .build();
+
+    ViewMetadata replaced =
+        
ViewMetadata.buildFrom(invalid).addVersion(viewVersion).setCurrentVersionId(2).build();
+
+    assertThat(replaced.currentVersion()).isEqualTo(viewVersion);
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java 
b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
new file mode 100644
index 0000000000..1c95955383
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
@@ -0,0 +1,1340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public abstract class ViewCatalogTests<C extends ViewCatalog & 
SupportsNamespaces> {
+  protected static final Schema SCHEMA =
+      new Schema(
+          5,
+          required(3, "id", Types.IntegerType.get(), "unique ID"),
+          required(4, "data", Types.StringType.get()));
+
+  private static final Schema OTHER_SCHEMA =
+      new Schema(7, required(1, "some_id", Types.IntegerType.get()));
+
+  protected abstract C catalog();
+
+  protected abstract Catalog tableCatalog();
+
+  protected boolean requiresNamespaceCreate() {
+    return false;
+  }
+
+  @Test
+  public void basicCreateView() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .create();
+
+    assertThat(view).isNotNull();
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    // validate view settings
+    assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(), 
identifier));
+    assertThat(view.history())
+        .hasSize(1)
+        .first()
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(1);
+    assertThat(view.schema().schemaId()).isEqualTo(0);
+    assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
+    assertThat(view.schemas()).hasSize(1).containsKey(0);
+    
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());
+
+    assertThat(view.currentVersion())
+        .isEqualTo(
+            ImmutableViewVersion.builder()
+                .timestampMillis(view.currentVersion().timestampMillis())
+                .versionId(1)
+                .schemaId(0)
+                .putSummary("operation", "create")
+                .defaultNamespace(identifier.namespace())
+                .addRepresentations(
+                    ImmutableSQLViewRepresentation.builder()
+                        .sql("select * from ns.tbl")
+                        .dialect("spark")
+                        .build())
+                .build());
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void completeCreateView() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withDefaultCatalog(catalog().name())
+            .withQuery("spark", "select * from ns.tbl")
+            .withQuery("trino", "select * from ns.tbl using X")
+            .withProperty("prop1", "val1")
+            .withProperty("prop2", "val2")
+            .create();
+
+    assertThat(view).isNotNull();
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    // validate view settings
+    assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(), 
identifier));
+    assertThat(view.properties()).containsEntry("prop1", 
"val1").containsEntry("prop2", "val2");
+    assertThat(view.history())
+        .hasSize(1)
+        .first()
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(1);
+    assertThat(view.schema().schemaId()).isEqualTo(0);
+    assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
+    assertThat(view.schemas()).hasSize(1).containsKey(0);
+    
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());
+
+    assertThat(view.currentVersion())
+        .isEqualTo(
+            ImmutableViewVersion.builder()
+                .timestampMillis(view.currentVersion().timestampMillis())
+                .versionId(1)
+                .schemaId(0)
+                .putSummary("operation", "create")
+                .defaultNamespace(identifier.namespace())
+                .defaultCatalog(catalog().name())
+                .addRepresentations(
+                    ImmutableSQLViewRepresentation.builder()
+                        .sql("select * from ns.tbl")
+                        .dialect("spark")
+                        .build())
+                .addRepresentations(
+                    ImmutableSQLViewRepresentation.builder()
+                        .sql("select * from ns.tbl using X")
+                        .dialect("trino")
+                        .build())
+                .build());
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void createViewErrorCases() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    SQLViewRepresentation trino =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.tbl")
+            .dialect("trino")
+            .build();
+
+    // query is required
+    assertThatThrownBy(() -> catalog().buildView(identifier).create())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot create view without specifying a query");
+
+    // schema is required
+    assertThatThrownBy(
+            () -> catalog().buildView(identifier).withQuery(trino.dialect(), 
trino.sql()).create())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot create view without specifying schema");
+
+    // default namespace is required
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(identifier)
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withSchema(SCHEMA)
+                    .create())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot create view without specifying a default 
namespace");
+
+    // cannot define multiple SQLs for same dialect
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(identifier)
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .create())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid view version: Cannot add multiple queries for 
dialect trino");
+  }
+
+  @Test
+  public void createViewThatAlreadyExists() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .create();
+
+    assertThat(view).isNotNull();
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(identifier)
+                    .withSchema(OTHER_SCHEMA)
+                    .withQuery("spark", "select * from ns.tbl")
+                    .withDefaultNamespace(identifier.namespace())
+                    .create())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("View already exists: ns.view");
+  }
+
+  @Test
+  public void createViewThatAlreadyExistsAsTable() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(tableIdentifier.namespace());
+    }
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
not exist").isFalse();
+
+    tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
exist").isTrue();
+
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(tableIdentifier)
+                    .withSchema(OTHER_SCHEMA)
+                    .withDefaultNamespace(tableIdentifier.namespace())
+                    .withQuery("spark", "select * from ns.tbl")
+                    .create())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("Table with same name already exists: 
ns.table");
+  }
+
+  @Test
+  public void createTableThatAlreadyExistsAsView() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(viewIdentifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(() -> tableCatalog().buildTable(viewIdentifier, 
SCHEMA).create())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("View with same name already exists: ns.view");
+  }
+
+  @Test
+  public void createTableViaTransactionThatAlreadyExistsAsView() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(viewIdentifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should not 
exist").isFalse();
+
+    Transaction transaction = tableCatalog().buildTable(viewIdentifier, 
SCHEMA).createTransaction();
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(transaction::commitTransaction)
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("View with same name already exists: ns.view");
+  }
+
+  @Test
+  public void replaceTableViaTransactionThatAlreadyExistsAsView() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(viewIdentifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should 
exist").isTrue();
+
+    // replace transaction requires table existence
+    // TODO: replace should check whether the table exists as a view
+    assertThatThrownBy(
+            () ->
+                tableCatalog()
+                    .buildTable(viewIdentifier, SCHEMA)
+                    .replaceTransaction()
+                    .commitTransaction())
+        .isInstanceOf(NoSuchTableException.class)
+        .hasMessageStartingWith("Table does not exist: ns.view");
+  }
+
+  @Test
+  public void createOrReplaceTableViaTransactionThatAlreadyExistsAsView() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(viewIdentifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(
+            () ->
+                tableCatalog()
+                    .buildTable(viewIdentifier, SCHEMA)
+                    .createOrReplaceTransaction()
+                    .commitTransaction())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("View with same name already exists: ns.view");
+  }
+
+  @Test
+  public void replaceViewThatAlreadyExistsAsTable() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(tableIdentifier.namespace());
+    }
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
not exist").isFalse();
+
+    tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
exist").isTrue();
+
+    // replace view requires the view to exist
+    // TODO: replace should check whether the view exists as a table
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(tableIdentifier)
+                    .withSchema(OTHER_SCHEMA)
+                    .withDefaultNamespace(tableIdentifier.namespace())
+                    .withQuery("spark", "select * from ns.tbl")
+                    .replace())
+        .isInstanceOf(NoSuchViewException.class)
+        .hasMessageStartingWith("View does not exist: ns.table");
+  }
+
+  @Test
+  public void createOrReplaceViewThatAlreadyExistsAsTable() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(tableIdentifier.namespace());
+    }
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
not exist").isFalse();
+
+    tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
exist").isTrue();
+
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(tableIdentifier)
+                    .withSchema(OTHER_SCHEMA)
+                    .withDefaultNamespace(tableIdentifier.namespace())
+                    .withQuery("spark", "select * from ns.tbl")
+                    .createOrReplace())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageStartingWith("Table with same name already exists: 
ns.table");
+  }
+
+  @Test
+  public void renameView() {
+    TableIdentifier from = TableIdentifier.of("ns", "view");
+    TableIdentifier to = TableIdentifier.of("ns", "renamedView");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(from.namespace());
+    }
+
+    assertThat(catalog().viewExists(from)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(from)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(from.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .create();
+
+    assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+    ViewMetadata original = ((BaseView) view).operations().current();
+
+    catalog().renameView(from, to);
+
+    assertThat(catalog().viewExists(from)).as("View should not exist with old 
name").isFalse();
+    assertThat(catalog().viewExists(to)).as("View should exist with new 
name").isTrue();
+
+    // ensure view metadata didn't change after renaming
+    View renamed = catalog().loadView(to);
+    assertThat(((BaseView) renamed).operations().current())
+        .usingRecursiveComparison()
+        .ignoringFieldsOfTypes(Schema.class)
+        .isEqualTo(original);
+
+    assertThat(catalog().dropView(from)).isFalse();
+    assertThat(catalog().dropView(to)).isTrue();
+    assertThat(catalog().viewExists(to)).as("View should not exist").isFalse();
+  }
+
+  @Test
+  public void renameViewUsingDifferentNamespace() {
+    TableIdentifier from = TableIdentifier.of("ns", "view");
+    TableIdentifier to = TableIdentifier.of("other_ns", "renamedView");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(from.namespace());
+      catalog().createNamespace(to.namespace());
+    }
+
+    assertThat(catalog().viewExists(from)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(from)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(from.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .create();
+
+    assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+    ViewMetadata original = ((BaseView) view).operations().current();
+
+    catalog().renameView(from, to);
+
+    assertThat(catalog().viewExists(from)).as("View should not exist with old 
name").isFalse();
+    assertThat(catalog().viewExists(to)).as("View should exist with new 
name").isTrue();
+
+    // ensure view metadata didn't change after renaming
+    View renamed = catalog().loadView(to);
+    assertThat(((BaseView) renamed).operations().current())
+        .usingRecursiveComparison()
+        .ignoringFieldsOfTypes(Schema.class)
+        .isEqualTo(original);
+
+    assertThat(catalog().dropView(from)).isFalse();
+    assertThat(catalog().dropView(to)).isTrue();
+    assertThat(catalog().viewExists(to)).as("View should not exist").isFalse();
+  }
+
+  @Test
+  public void renameViewNamespaceMissing() {
+    TableIdentifier from = TableIdentifier.of("ns", "view");
+    TableIdentifier to = TableIdentifier.of("non_existing", "renamedView");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(from.namespace());
+    }
+
+    assertThat(catalog().viewExists(from)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(from)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(from.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+    assertThatThrownBy(() -> catalog().renameView(from, to))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: non_existing");
+  }
+
+  @Test
+  public void renameViewSourceMissing() {
+    TableIdentifier from = TableIdentifier.of("ns", "non_existing");
+    TableIdentifier to = TableIdentifier.of("ns", "renamedView");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(from.namespace());
+    }
+
+    assertThat(catalog().viewExists(from)).as("View should not 
exist").isFalse();
+
+    assertThatThrownBy(() -> catalog().renameView(from, to))
+        .isInstanceOf(NoSuchViewException.class)
+        .hasMessageContaining("View does not exist");
+  }
+
+  @Test
+  public void renameViewTargetAlreadyExistsAsView() {
+    TableIdentifier viewOne = TableIdentifier.of("ns", "viewOne");
+    TableIdentifier viewTwo = TableIdentifier.of("ns", "viewTwo");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(viewOne.namespace());
+    }
+
+    for (TableIdentifier identifier : ImmutableList.of(viewOne, viewTwo)) {
+      assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+      catalog()
+          .buildView(identifier)
+          .withSchema(SCHEMA)
+          .withDefaultNamespace(viewOne.namespace())
+          .withQuery("spark", "select * from ns.tbl")
+          .create();
+
+      assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+    }
+
+    assertThatThrownBy(() -> catalog().renameView(viewOne, viewTwo))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Cannot rename ns.viewOne to ns.viewTwo. View 
already exists");
+  }
+
+  @Test
+  public void renameViewTargetAlreadyExistsAsTable() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+    TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(tableIdentifier.namespace());
+    }
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
not exist").isFalse();
+
+    tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+    assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should 
exist").isTrue();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(viewIdentifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(() -> catalog().renameView(viewIdentifier, 
tableIdentifier))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Cannot rename ns.view to ns.table. Table 
already exists");
+  }
+
+  @Test
+  public void listViews() {
+    Namespace ns1 = Namespace.of("ns1");
+    Namespace ns2 = Namespace.of("ns2");
+
+    TableIdentifier view1 = TableIdentifier.of(ns1, "view1");
+    TableIdentifier view2 = TableIdentifier.of(ns2, "view2");
+    TableIdentifier view3 = TableIdentifier.of(ns2, "view3");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(ns1);
+      catalog().createNamespace(ns2);
+    }
+
+    assertThat(catalog().listViews(ns1)).isEmpty();
+    assertThat(catalog().listViews(ns2)).isEmpty();
+
+    catalog()
+        .buildView(view1)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(view1.namespace())
+        .withQuery("spark", "select * from ns1.tbl")
+        .create();
+
+    assertThat(catalog().listViews(ns1)).containsExactly(view1);
+    assertThat(catalog().listViews(ns2)).isEmpty();
+
+    catalog()
+        .buildView(view2)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(view2.namespace())
+        .withQuery("spark", "select * from ns1.tbl")
+        .create();
+
+    assertThat(catalog().listViews(ns1)).containsExactly(view1);
+    assertThat(catalog().listViews(ns2)).containsExactly(view2);
+
+    catalog()
+        .buildView(view3)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(view3.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().listViews(ns1)).containsExactly(view1);
+    assertThat(catalog().listViews(ns2)).containsExactlyInAnyOrder(view2, 
view3);
+
+    assertThat(catalog().dropView(view2)).isTrue();
+    assertThat(catalog().listViews(ns1)).containsExactly(view1);
+    assertThat(catalog().listViews(ns2)).containsExactly(view3);
+
+    assertThat(catalog().dropView(view3)).isTrue();
+    assertThat(catalog().listViews(ns1)).containsExactly(view1);
+    assertThat(catalog().listViews(ns2)).isEmpty();
+
+    assertThat(catalog().dropView(view1)).isTrue();
+    assertThat(catalog().listViews(ns1)).isEmpty();
+    assertThat(catalog().listViews(ns2)).isEmpty();
+  }
+
+  @Test
+  public void listViewsAndTables() {
+    Assumptions.assumeThat(tableCatalog())
+        .as("Only valid for catalogs that support tables")
+        .isNotNull();
+
+    Namespace ns = Namespace.of("ns");
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(ns, "table");
+    TableIdentifier viewIdentifier = TableIdentifier.of(ns, "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(ns);
+    }
+
+    assertThat(catalog().listViews(ns)).isEmpty();
+    assertThat(tableCatalog().listTables(ns)).isEmpty();
+
+    tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+    assertThat(catalog().listViews(ns)).isEmpty();
+    assertThat(tableCatalog().listTables(ns)).containsExactly(tableIdentifier);
+
+    catalog()
+        .buildView(viewIdentifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(viewIdentifier.namespace())
+        .withQuery("spark", "select * from ns1.tbl")
+        .create();
+
+    assertThat(catalog().listViews(ns)).containsExactly(viewIdentifier);
+    assertThat(tableCatalog().listTables(ns)).containsExactly(tableIdentifier);
+
+    assertThat(tableCatalog().dropTable(tableIdentifier)).isTrue();
+    assertThat(catalog().listViews(ns)).containsExactly(viewIdentifier);
+    assertThat(tableCatalog().listTables(ns)).isEmpty();
+
+    assertThat(catalog().dropView(viewIdentifier)).isTrue();
+    assertThat(catalog().listViews(ns)).isEmpty();
+    assertThat(tableCatalog().listTables(ns)).isEmpty();
+  }
+
+  @ParameterizedTest(name = ".createOrReplace() = {arguments}")
+  @ValueSource(booleans = {false, true})
+  public void createOrReplaceView(boolean useCreateOrReplace) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    ViewBuilder viewBuilder =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .withProperty("prop1", "val1")
+            .withProperty("prop2", "val2");
+    View view = useCreateOrReplace ? viewBuilder.createOrReplace() : 
viewBuilder.create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    ViewVersion viewVersion = view.currentVersion();
+    assertThat(viewVersion.representations())
+        .containsExactly(
+            ImmutableSQLViewRepresentation.builder()
+                .sql("select * from ns.tbl")
+                .dialect("spark")
+                .build());
+
+    viewBuilder =
+        catalog()
+            .buildView(identifier)
+            .withSchema(OTHER_SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("trino", "select count(*) from ns.tbl")
+            .withProperty("replacedProp1", "val1")
+            .withProperty("replacedProp2", "val2");
+    View replacedView = useCreateOrReplace ? viewBuilder.createOrReplace() : 
viewBuilder.replace();
+
+    // validate replaced view settings
+    
assertThat(replacedView.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(),
 identifier));
+    assertThat(replacedView.properties())
+        .containsEntry("prop1", "val1")
+        .containsEntry("prop2", "val2")
+        .containsEntry("replacedProp1", "val1")
+        .containsEntry("replacedProp2", "val2");
+    assertThat(replacedView.history())
+        .hasSize(2)
+        .first()
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(1);
+    assertThat(replacedView.history())
+        .element(1)
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(2);
+
+    assertThat(replacedView.schema().schemaId()).isEqualTo(1);
+    
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_SCHEMA.asStruct());
+    
assertThat(replacedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
+
+    ViewVersion replacedViewVersion = replacedView.currentVersion();
+    assertThat(replacedView.versions())
+        .hasSize(2)
+        .containsExactly(viewVersion, replacedViewVersion);
+    assertThat(replacedViewVersion).isNotNull();
+    assertThat(replacedViewVersion.versionId()).isEqualTo(2);
+    assertThat(replacedViewVersion.schemaId()).isEqualTo(1);
+    assertThat(replacedViewVersion.operation()).isEqualTo("replace");
+    
assertThat(replacedViewVersion.summary()).hasSize(1).containsEntry("operation", 
"replace");
+    assertThat(replacedViewVersion.representations())
+        .containsExactly(
+            ImmutableSQLViewRepresentation.builder()
+                .sql("select count(*) from ns.tbl")
+                .dialect("trino")
+                .build());
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void replaceViewErrorCases() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    SQLViewRepresentation trino =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.tbl")
+            .dialect("trino")
+            .build();
+
+    catalog()
+        .buildView(identifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(identifier.namespace())
+        .withQuery(trino.dialect(), trino.sql())
+        .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    // query is required
+    assertThatThrownBy(() -> catalog().buildView(identifier).replace())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying a query");
+
+    // schema is required
+    assertThatThrownBy(
+            () -> catalog().buildView(identifier).withQuery(trino.dialect(), 
trino.sql()).replace())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying schema");
+
+    // default namespace is required
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(identifier)
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withSchema(SCHEMA)
+                    .replace())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying a default 
namespace");
+
+    // cannot replace non-existing view
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(TableIdentifier.of("ns", "non_existing"))
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .replace())
+        .isInstanceOf(NoSuchViewException.class)
+        .hasMessageStartingWith("View does not exist: ns.non_existing");
+
+    // cannot define multiple SQLs for same dialect
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .buildView(identifier)
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .replace())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid view version: Cannot add multiple queries for 
dialect trino");
+  }
+
+  @Test
+  public void updateViewProperties() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("spark", "select * from ns.tbl")
+            .create();
+
+    ViewVersion viewVersion = view.currentVersion();
+
+    view.updateProperties().set("key1", "val1").set("key2", 
"val2").remove("non-existing").commit();
+
+    View updatedView = catalog().loadView(identifier);
+    assertThat(updatedView.properties())
+        .containsEntry("key1", "val1")
+        .containsEntry("key2", "val2");
+
+    // history and view versions should stay the same after updating view 
properties
+    assertThat(updatedView.history()).hasSize(1).isEqualTo(view.history());
+    assertThat(updatedView.versions()).hasSize(1).containsExactly(viewVersion);
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void updateViewPropertiesErrorCases() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(identifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(identifier.namespace())
+        .withQuery("spark", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    assertThatThrownBy(
+            () -> catalog().loadView(identifier).updateProperties().set(null, 
"new-val1").commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid key: null");
+
+    assertThatThrownBy(
+            () -> 
catalog().loadView(identifier).updateProperties().set("key1", null).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid value: null");
+
+    assertThatThrownBy(
+            () -> 
catalog().loadView(identifier).updateProperties().remove(null).commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid key: null");
+
+    assertThatThrownBy(
+            () ->
+                catalog()
+                    .loadView(identifier)
+                    .updateProperties()
+                    .set("key1", "x")
+                    .set("key3", "y")
+                    .remove("key2")
+                    .set("key2", "z")
+                    .commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot remove and update the same key: key2");
+  }
+
+  @Test
+  public void replaceViewVersion() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    SQLViewRepresentation spark =
+        ImmutableSQLViewRepresentation.builder()
+            .dialect("spark")
+            .sql("select * from ns.tbl")
+            .build();
+
+    SQLViewRepresentation trino =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.tbl")
+            .dialect("trino")
+            .build();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery(trino.dialect(), trino.sql())
+            .withQuery(spark.dialect(), spark.sql())
+            .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    ViewVersion viewVersion = view.currentVersion();
+    
assertThat(viewVersion.representations()).hasSize(2).containsExactly(trino, 
spark);
+
+    // uses a different schema and view representation
+    view.replaceVersion()
+        .withSchema(OTHER_SCHEMA)
+        .withQuery(trino.dialect(), trino.sql())
+        .withDefaultCatalog("default")
+        .withDefaultNamespace(identifier.namespace())
+        .commit();
+
+    // history and view versions should reflect the changes
+    View updatedView = catalog().loadView(identifier);
+    assertThat(updatedView.history())
+        .hasSize(2)
+        .element(0)
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(viewVersion.versionId());
+    assertThat(updatedView.history())
+        .element(1)
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(updatedView.currentVersion().versionId());
+    assertThat(updatedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
+    assertThat(updatedView.versions())
+        .hasSize(2)
+        .containsExactly(viewVersion, updatedView.currentVersion());
+
+    ViewVersion updatedViewVersion = updatedView.currentVersion();
+    assertThat(updatedViewVersion).isNotNull();
+    
assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId() + 
1);
+    
assertThat(updatedViewVersion.summary()).hasSize(1).containsEntry("operation", 
"replace");
+    assertThat(updatedViewVersion.operation()).isEqualTo("replace");
+    
assertThat(updatedViewVersion.representations()).hasSize(1).containsExactly(trino);
+    assertThat(updatedViewVersion.schemaId()).isEqualTo(1);
+    assertThat(updatedViewVersion.defaultCatalog()).isEqualTo("default");
+    
assertThat(updatedViewVersion.defaultNamespace()).isEqualTo(identifier.namespace());
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void replaceViewVersionByUpdatingSQLForDialect() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    SQLViewRepresentation spark =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.tbl")
+            .dialect("spark")
+            .build();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery(spark.dialect(), spark.sql())
+            .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    ViewVersion viewVersion = view.currentVersion();
+    
assertThat(viewVersion.representations()).hasSize(1).containsExactly(spark);
+
+    SQLViewRepresentation updatedSpark =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.updated_tbl")
+            .dialect("spark")
+            .build();
+
+    // only update the SQL for spark
+    view.replaceVersion()
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(identifier.namespace())
+        .withQuery(updatedSpark.dialect(), updatedSpark.sql())
+        .commit();
+
+    // history and view versions should reflect the changes
+    View updatedView = catalog().loadView(identifier);
+    assertThat(updatedView.history())
+        .hasSize(2)
+        .element(0)
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(viewVersion.versionId());
+    assertThat(updatedView.history())
+        .element(1)
+        .extracting(ViewHistoryEntry::versionId)
+        .isEqualTo(updatedView.currentVersion().versionId());
+    assertThat(updatedView.versions())
+        .hasSize(2)
+        .containsExactly(viewVersion, updatedView.currentVersion());
+
+    // updated view should have the new SQL
+    assertThat(updatedView.currentVersion().representations())
+        .hasSize(1)
+        .containsExactly(updatedSpark);
+
+    assertThat(catalog().dropView(identifier)).isTrue();
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+  }
+
+  @Test
+  public void replaceViewVersionErrorCases() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    SQLViewRepresentation trino =
+        ImmutableSQLViewRepresentation.builder()
+            .sql("select * from ns.tbl")
+            .dialect("trino")
+            .build();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery(trino.dialect(), trino.sql())
+            .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    // empty commits are not allowed
+    assertThatThrownBy(() -> view.replaceVersion().commit())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying a query");
+
+    // schema is required
+    assertThatThrownBy(
+            () ->
+                view.replaceVersion()
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withDefaultNamespace(identifier.namespace())
+                    .commit())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying schema");
+
+    // default namespace is required
+    assertThatThrownBy(
+            () ->
+                view.replaceVersion()
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withSchema(SCHEMA)
+                    .commit())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot replace view without specifying a default 
namespace");
+
+    // cannot define multiple SQLs for same dialect
+    assertThatThrownBy(
+            () ->
+                view.replaceVersion()
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .withQuery(trino.dialect(), trino.sql())
+                    .commit())
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid view version: Cannot add multiple queries for 
dialect trino");
+  }
+
+  @Test
+  public void updateViewPropertiesConflict() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("trino", "select * from ns.tbl")
+            .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+    UpdateViewProperties updateViewProperties = view.updateProperties();
+
+    // drop view and then try to use the updateProperties API
+    catalog().dropView(identifier);
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    assertThatThrownBy(() -> updateViewProperties.set("key1", "val1").commit())
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Cannot commit");
+  }
+
+  @Test
+  public void replaceViewVersionConflict() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    View view =
+        catalog()
+            .buildView(identifier)
+            .withSchema(SCHEMA)
+            .withDefaultNamespace(identifier.namespace())
+            .withQuery("trino", "select * from ns.tbl")
+            .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+    ReplaceViewVersion replaceViewVersion = view.replaceVersion();
+
+    // drop view and then try to use the replaceVersion API
+    catalog().dropView(identifier);
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    assertThatThrownBy(
+            () ->
+                replaceViewVersion
+                    .withQuery("trino", "select * from ns.tbl")
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .commit())
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessageContaining("Cannot commit");
+  }
+
+  @Test
+  public void createViewConflict() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+    ViewBuilder viewBuilder = catalog().buildView(identifier);
+
+    catalog()
+        .buildView(identifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(identifier.namespace())
+        .withQuery("trino", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+
+    // the view was already created concurrently
+    assertThatThrownBy(
+            () ->
+                viewBuilder
+                    .withQuery("trino", "select * from ns.tbl")
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .create())
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("View already exists: ns.view");
+  }
+
+  @Test
+  public void replaceViewConflict() {
+    TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+    if (requiresNamespaceCreate()) {
+      catalog().createNamespace(identifier.namespace());
+    }
+
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    catalog()
+        .buildView(identifier)
+        .withSchema(SCHEMA)
+        .withDefaultNamespace(identifier.namespace())
+        .withQuery("trino", "select * from ns.tbl")
+        .create();
+
+    assertThat(catalog().viewExists(identifier)).as("View should 
exist").isTrue();
+    ViewBuilder viewBuilder = catalog().buildView(identifier);
+
+    catalog().dropView(identifier);
+    assertThat(catalog().viewExists(identifier)).as("View should not 
exist").isFalse();
+
+    // the view was already dropped concurrently
+    assertThatThrownBy(
+            () ->
+                viewBuilder
+                    .withQuery("trino", "select * from ns.tbl")
+                    .withSchema(SCHEMA)
+                    .withDefaultNamespace(identifier.namespace())
+                    .replace())
+        .isInstanceOf(NoSuchViewException.class)
+        .hasMessageStartingWith("View does not exist: ns.view");
+  }
+}
diff --git 
a/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
 
b/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
new file mode 100644
index 0000000000..f5bc045294
--- /dev/null
+++ 
b/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
@@ -0,0 +1,63 @@
+{
+  "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+  "format-version": 1,
+  "location": "s3://bucket/test/location",
+  "properties": {"some-key": "some-value"},
+  "current-schema-id": 0,
+  "schemas": [
+    {
+      "type": "struct",
+      "schema-id": 0,
+      "fields": [
+        {
+          "id": 1,
+          "name": "x",
+          "required": true,
+          "type": "long"
+        },
+        {
+          "id": 2,
+          "name": "y",
+          "required": true,
+          "type": "long",
+          "doc": "comment"
+        },
+        {
+          "id": 3,
+          "name": "z",
+          "required": true,
+          "type": "long"
+        }
+      ]
+    }
+  ],
+  "current-version-id": 1,
+  "versions": [
+    {
+      "version-id": 1,
+      "timestamp-ms": 4353,
+      "summary": {"operation":"create"},
+      "schema-id": 0,
+      "default-catalog": "some-catalog",
+      "default-namespace": [],
+      "representations": [
+        {
+          "type": "sql",
+          "sql": "select 'foo' foo",
+          "dialect": "spark-sql"
+        },
+        {
+          "type": "sql",
+          "sql": "select * from foo",
+          "dialect": "spark-sql"
+        }
+      ]
+    }
+  ],
+  "version-log": [
+    {
+      "timestamp-ms": 4353,
+      "version-id": 1
+    }
+  ]
+}
\ No newline at end of file

Reply via email to