This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6100efc1a7 Core: Add remaining View APIs and support for
InMemoryCatalog (#7880)
6100efc1a7 is described below
commit 6100efc1a764b204e5a879e0ae6d535a71301249
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Sep 29 00:24:07 2023 +0200
Core: Add remaining View APIs and support for InMemoryCatalog (#7880)
---
.../apache/iceberg/inmemory/InMemoryCatalog.java | 298 +++--
.../iceberg/view/BaseMetastoreViewCatalog.java | 220 ++++
.../java/org/apache/iceberg/view/BaseView.java | 89 ++
.../apache/iceberg/view/BaseViewOperations.java | 220 ++++
.../org/apache/iceberg/view/PropertiesUpdate.java | 99 ++
.../java/org/apache/iceberg/view/ViewMetadata.java | 12 +
.../org/apache/iceberg/view/ViewOperations.java | 61 +
.../java/org/apache/iceberg/view/ViewUtil.java | 29 +
.../apache/iceberg/view/ViewVersionReplace.java | 128 ++
.../iceberg/inmemory/TestInMemoryViewCatalog.java | 49 +
.../org/apache/iceberg/view/TestViewMetadata.java | 31 +
.../iceberg/view/TestViewMetadataParser.java | 82 ++
.../org/apache/iceberg/view/ViewCatalogTests.java | 1340 ++++++++++++++++++++
.../view/ViewMetadataMultipleSQLsForDialect.json | 63 +
14 files changed, 2649 insertions(+), 72 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
index 3956e9192a..566e56eec9 100644
--- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
@@ -42,23 +41,30 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.view.BaseMetastoreViewCatalog;
+import org.apache.iceberg.view.BaseViewOperations;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.iceberg.view.ViewUtil;
/**
* Catalog implementation that uses in-memory data-structures to store the
namespaces and tables.
* This class doesn't touch external resources and can be utilized to write
unit tests without side
* effects. It uses {@link InMemoryFileIO}.
*/
-public class InMemoryCatalog extends BaseMetastoreCatalog implements
SupportsNamespaces, Closeable {
+public class InMemoryCatalog extends BaseMetastoreViewCatalog
+ implements SupportsNamespaces, Closeable {
private static final Joiner SLASH = Joiner.on("/");
private static final Joiner DOT = Joiner.on(".");
private final ConcurrentMap<Namespace, Map<String, String>> namespaces;
private final ConcurrentMap<TableIdentifier, String> tables;
+ private final ConcurrentMap<TableIdentifier, String> views;
private FileIO io;
private String catalogName;
private String warehouseLocation;
@@ -66,6 +72,7 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
this.tables = Maps.newConcurrentMap();
+ this.views = Maps.newConcurrentMap();
}
@Override
@@ -111,8 +118,10 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
lastMetadata = null;
}
- if (null == tables.remove(tableIdentifier)) {
- return false;
+ synchronized (this) {
+ if (null == tables.remove(tableIdentifier)) {
+ return false;
+ }
}
if (purge && lastMetadata != null) {
@@ -136,27 +145,29 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
}
@Override
- public synchronized void renameTable(TableIdentifier from, TableIdentifier
to) {
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
if (from.equals(to)) {
return;
}
- if (!namespaceExists(to.namespace())) {
- throw new NoSuchNamespaceException(
- "Cannot rename %s to %s. Namespace does not exist: %s", from, to,
to.namespace());
- }
+ synchronized (this) {
+ if (!namespaceExists(to.namespace())) {
+ throw new NoSuchNamespaceException(
+ "Cannot rename %s to %s. Namespace does not exist: %s", from, to,
to.namespace());
+ }
- String fromLocation = tables.get(from);
- if (null == fromLocation) {
- throw new NoSuchTableException("Cannot rename %s to %s. Table does not
exist", from, to);
- }
+ String fromLocation = tables.get(from);
+ if (null == fromLocation) {
+ throw new NoSuchTableException("Cannot rename %s to %s. Table does not
exist", from, to);
+ }
- if (tables.containsKey(to)) {
- throw new AlreadyExistsException("Cannot rename %s to %s. Table already
exists", from, to);
- }
+ if (tables.containsKey(to)) {
+ throw new AlreadyExistsException("Cannot rename %s to %s. Table
already exists", from, to);
+ }
- tables.put(to, fromLocation);
- tables.remove(from);
+ tables.put(to, fromLocation);
+ tables.remove(from);
+ }
}
@Override
@@ -166,12 +177,14 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
@Override
public void createNamespace(Namespace namespace, Map<String, String>
metadata) {
- if (namespaceExists(namespace)) {
- throw new AlreadyExistsException(
- "Cannot create namespace %s. Namespace already exists", namespace);
- }
+ synchronized (this) {
+ if (namespaceExists(namespace)) {
+ throw new AlreadyExistsException(
+ "Cannot create namespace %s. Namespace already exists", namespace);
+ }
- namespaces.put(namespace, ImmutableMap.copyOf(metadata));
+ namespaces.put(namespace, ImmutableMap.copyOf(metadata));
+ }
}
@Override
@@ -181,50 +194,59 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
@Override
public boolean dropNamespace(Namespace namespace) throws
NamespaceNotEmptyException {
- if (!namespaceExists(namespace)) {
- return false;
- }
+ synchronized (this) {
+ if (!namespaceExists(namespace)) {
+ return false;
+ }
- List<TableIdentifier> tableIdentifiers = listTables(namespace);
- if (!tableIdentifiers.isEmpty()) {
- throw new NamespaceNotEmptyException(
- "Namespace %s is not empty. Contains %d table(s).", namespace,
tableIdentifiers.size());
- }
+ List<TableIdentifier> tableIdentifiers = listTables(namespace);
+ if (!tableIdentifiers.isEmpty()) {
+ throw new NamespaceNotEmptyException(
+ "Namespace %s is not empty. Contains %d table(s).", namespace,
tableIdentifiers.size());
+ }
- return namespaces.remove(namespace) != null;
+ return namespaces.remove(namespace) != null;
+ }
}
@Override
public boolean setProperties(Namespace namespace, Map<String, String>
properties)
throws NoSuchNamespaceException {
- if (!namespaceExists(namespace)) {
- throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
- }
+ synchronized (this) {
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
- namespaces.computeIfPresent(
- namespace,
- (k, v) ->
- ImmutableMap.<String,
String>builder().putAll(v).putAll(properties).buildKeepingLast());
+ namespaces.computeIfPresent(
+ namespace,
+ (k, v) ->
+ ImmutableMap.<String, String>builder()
+ .putAll(v)
+ .putAll(properties)
+ .buildKeepingLast());
- return true;
+ return true;
+ }
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties)
throws NoSuchNamespaceException {
- if (!namespaceExists(namespace)) {
- throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
- }
+ synchronized (this) {
+ if (!namespaceExists(namespace)) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
- namespaces.computeIfPresent(
- namespace,
- (k, v) -> {
- Map<String, String> newProperties = Maps.newHashMap(v);
- properties.forEach(newProperties::remove);
- return ImmutableMap.copyOf(newProperties);
- });
+ namespaces.computeIfPresent(
+ namespace,
+ (k, v) -> {
+ Map<String, String> newProperties = Maps.newHashMap(v);
+ properties.forEach(newProperties::remove);
+ return ImmutableMap.copyOf(newProperties);
+ });
- return true;
+ return true;
+ }
}
@Override
@@ -278,15 +300,73 @@ public class InMemoryCatalog extends BaseMetastoreCatalog
implements SupportsNam
public void close() throws IOException {
namespaces.clear();
tables.clear();
+ views.clear();
+ }
+
+ @Override
+ public List<TableIdentifier> listViews(Namespace namespace) {
+ if (!namespaceExists(namespace) && !namespace.isEmpty()) {
+ throw new NoSuchNamespaceException(
+ "Cannot list views for namespace. Namespace does not exist: %s",
namespace);
+ }
+
+ return views.keySet().stream()
+ .filter(v -> namespace.isEmpty() || v.namespace().equals(namespace))
+ .sorted(Comparator.comparing(TableIdentifier::toString))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
+ return new InMemoryViewOperations(io, identifier);
+ }
+
+ @Override
+ public boolean dropView(TableIdentifier identifier) {
+ synchronized (this) {
+ return null != views.remove(identifier);
+ }
+ }
+
+ @Override
+ public void renameView(TableIdentifier from, TableIdentifier to) {
+ if (from.equals(to)) {
+ return;
+ }
+
+ synchronized (this) {
+ if (!namespaceExists(to.namespace())) {
+ throw new NoSuchNamespaceException(
+ "Cannot rename %s to %s. Namespace does not exist: %s", from, to,
to.namespace());
+ }
+
+ String fromViewLocation = views.get(from);
+ if (null == fromViewLocation) {
+ throw new NoSuchViewException("Cannot rename %s to %s. View does not
exist", from, to);
+ }
+
+ if (tables.containsKey(to)) {
+ throw new AlreadyExistsException("Cannot rename %s to %s. Table
already exists", from, to);
+ }
+
+ if (views.containsKey(to)) {
+ throw new AlreadyExistsException("Cannot rename %s to %s. View already
exists", from, to);
+ }
+
+ views.put(to, fromViewLocation);
+ views.remove(from);
+ }
}
private class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
+ private final String fullTableName;
InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
this.fileIO = fileIO;
this.tableIdentifier = tableIdentifier;
+ this.fullTableName = fullTableName(catalogName, tableIdentifier);
}
@Override
@@ -301,40 +381,114 @@ public class InMemoryCatalog extends
BaseMetastoreCatalog implements SupportsNam
@Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
- String newLocation = writeNewMetadata(metadata, currentVersion() + 1);
+ String newLocation = writeNewMetadataIfRequired(base == null, metadata);
String oldLocation = base == null ? null : base.metadataFileLocation();
- if (null == base && !namespaceExists(tableIdentifier.namespace())) {
- throw new NoSuchNamespaceException(
- "Cannot create table %s. Namespace does not exist: %s",
- tableIdentifier, tableIdentifier.namespace());
+ synchronized (InMemoryCatalog.this) {
+ if (null == base && !namespaceExists(tableIdentifier.namespace())) {
+ throw new NoSuchNamespaceException(
+ "Cannot create table %s. Namespace does not exist: %s",
+ tableIdentifier, tableIdentifier.namespace());
+ }
+
+ if (views.containsKey(tableIdentifier)) {
+ throw new AlreadyExistsException(
+ "View with same name already exists: %s", tableIdentifier);
+ }
+
+ tables.compute(
+ tableIdentifier,
+ (k, existingLocation) -> {
+ if (!Objects.equal(existingLocation, oldLocation)) {
+ if (null == base) {
+ throw new AlreadyExistsException("Table already exists: %s",
tableName());
+ }
+
+ throw new CommitFailedException(
+ "Cannot commit to table %s metadata location from %s to %s
"
+ + "because it has been concurrently modified to %s",
+ tableIdentifier, oldLocation, newLocation,
existingLocation);
+ }
+ return newLocation;
+ });
+ }
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+
+ @Override
+ protected String tableName() {
+ return fullTableName;
+ }
+ }
+
+ private class InMemoryViewOperations extends BaseViewOperations {
+ private final FileIO io;
+ private final TableIdentifier identifier;
+ private final String fullViewName;
+
+ InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
+ this.io = io;
+ this.identifier = identifier;
+ this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
+ }
+
+ @Override
+ public void doRefresh() {
+ String latestLocation = views.get(identifier);
+ if (latestLocation == null) {
+ disableRefresh();
+ } else {
+ refreshFromMetadataLocation(latestLocation);
}
+ }
- tables.compute(
- tableIdentifier,
- (k, existingLocation) -> {
- if (!Objects.equal(existingLocation, oldLocation)) {
- if (null == base) {
- throw new AlreadyExistsException("Table already exists: %s",
tableName());
+ @Override
+ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+ String newLocation = writeNewMetadataIfRequired(metadata);
+ String oldLocation = base == null ? null : currentMetadataLocation();
+
+ synchronized (InMemoryCatalog.this) {
+ if (null == base && !namespaceExists(identifier.namespace())) {
+ throw new NoSuchNamespaceException(
+ "Cannot create view %s. Namespace does not exist: %s",
+ identifier, identifier.namespace());
+ }
+
+ if (tables.containsKey(identifier)) {
+ throw new AlreadyExistsException("Table with same name already
exists: %s", identifier);
+ }
+
+ views.compute(
+ identifier,
+ (k, existingLocation) -> {
+ if (!Objects.equal(existingLocation, oldLocation)) {
+ if (null == base) {
+ throw new AlreadyExistsException("View already exists: %s",
identifier);
+ }
+
+ throw new CommitFailedException(
+ "Cannot commit to view %s metadata location from %s to %s "
+ + "because it has been concurrently modified to %s",
+ identifier, oldLocation, newLocation, existingLocation);
}
- throw new CommitFailedException(
- "Cannot commit to table %s metadata location from %s to %s "
- + "because it has been concurrently modified to %s",
- tableIdentifier, oldLocation, newLocation, existingLocation);
- }
- return newLocation;
- });
+ return newLocation;
+ });
+ }
}
@Override
public FileIO io() {
- return fileIO;
+ return io;
}
@Override
- protected String tableName() {
- return tableIdentifier.toString();
+ protected String viewName() {
+ return fullViewName;
}
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
new file mode 100644
index 0000000000..42eb80a047
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog
implements ViewCatalog {
+ protected abstract ViewOperations newViewOps(TableIdentifier identifier);
+
+ @Override
+ public void initialize(String name, Map<String, String> properties) {
+ super.initialize(name, properties);
+ }
+
+ @Override
+ public String name() {
+ return super.name();
+ }
+
+ @Override
+ public View loadView(TableIdentifier identifier) {
+ if (isValidIdentifier(identifier)) {
+ ViewOperations ops = newViewOps(identifier);
+ if (ops.current() == null) {
+ throw new NoSuchViewException("View does not exist: %s", identifier);
+ } else {
+ return new BaseView(newViewOps(identifier),
ViewUtil.fullViewName(name(), identifier));
+ }
+ }
+
+ throw new NoSuchViewException("Invalid view identifier: %s", identifier);
+ }
+
+ @Override
+ public ViewBuilder buildView(TableIdentifier identifier) {
+ return new BaseViewBuilder(identifier);
+ }
+
+ protected class BaseViewBuilder implements ViewBuilder {
+ private final TableIdentifier identifier;
+ private final Map<String, String> properties = Maps.newHashMap();
+ private final List<ViewRepresentation> representations =
Lists.newArrayList();
+ private Namespace defaultNamespace = null;
+ private String defaultCatalog = null;
+ private Schema schema = null;
+
+ protected BaseViewBuilder(TableIdentifier identifier) {
+ Preconditions.checkArgument(
+ isValidIdentifier(identifier), "Invalid view identifier: %s",
identifier);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public ViewBuilder withSchema(Schema newSchema) {
+ this.schema = newSchema;
+ return this;
+ }
+
+ @Override
+ public ViewBuilder withQuery(String dialect, String sql) {
+ representations.add(
+
ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build());
+ return this;
+ }
+
+ @Override
+ public ViewBuilder withDefaultCatalog(String catalog) {
+ this.defaultCatalog = catalog;
+ return this;
+ }
+
+ @Override
+ public ViewBuilder withDefaultNamespace(Namespace namespace) {
+ this.defaultNamespace = namespace;
+ return this;
+ }
+
+ @Override
+ public ViewBuilder withProperties(Map<String, String> newProperties) {
+ this.properties.putAll(newProperties);
+ return this;
+ }
+
+ @Override
+ public ViewBuilder withProperty(String key, String value) {
+ this.properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public View create() {
+ return create(newViewOps(identifier));
+ }
+
+ @Override
+ public View replace() {
+ return replace(newViewOps(identifier));
+ }
+
+ @Override
+ public View createOrReplace() {
+ ViewOperations ops = newViewOps(identifier);
+ if (null == ops.current()) {
+ return create(ops);
+ } else {
+ return replace(ops);
+ }
+ }
+
+ private View create(ViewOperations ops) {
+ if (null != ops.current()) {
+ throw new AlreadyExistsException("View already exists: %s",
identifier);
+ }
+
+ Preconditions.checkState(
+ !representations.isEmpty(), "Cannot create view without specifying a
query");
+ Preconditions.checkState(null != schema, "Cannot create view without
specifying schema");
+ Preconditions.checkState(
+ null != defaultNamespace, "Cannot create view without specifying a
default namespace");
+
+ ViewVersion viewVersion =
+ ImmutableViewVersion.builder()
+ .versionId(1)
+ .schemaId(schema.schemaId())
+ .addAllRepresentations(representations)
+ .defaultNamespace(defaultNamespace)
+ .defaultCatalog(defaultCatalog)
+ .timestampMillis(System.currentTimeMillis())
+ .putSummary("operation", "create")
+ .build();
+
+ ViewMetadata viewMetadata =
+ ViewMetadata.builder()
+ .setProperties(properties)
+ .setLocation(defaultWarehouseLocation(identifier))
+ .setCurrentVersion(viewVersion, schema)
+ .build();
+
+ try {
+ ops.commit(null, viewMetadata);
+ } catch (CommitFailedException ignored) {
+ throw new AlreadyExistsException("View was created concurrently: %s",
identifier);
+ }
+
+ return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
+ }
+
+ private View replace(ViewOperations ops) {
+ if (null == ops.current()) {
+ throw new NoSuchViewException("View does not exist: %s", identifier);
+ }
+
+ Preconditions.checkState(
+ !representations.isEmpty(), "Cannot replace view without specifying
a query");
+ Preconditions.checkState(null != schema, "Cannot replace view without
specifying schema");
+ Preconditions.checkState(
+ null != defaultNamespace, "Cannot replace view without specifying a
default namespace");
+
+ ViewMetadata metadata = ops.current();
+ int maxVersionId =
+ metadata.versions().stream()
+ .map(ViewVersion::versionId)
+ .max(Integer::compareTo)
+ .orElseGet(metadata::currentVersionId);
+
+ ViewVersion viewVersion =
+ ImmutableViewVersion.builder()
+ .versionId(maxVersionId + 1)
+ .schemaId(schema.schemaId())
+ .addAllRepresentations(representations)
+ .defaultNamespace(defaultNamespace)
+ .defaultCatalog(defaultCatalog)
+ .timestampMillis(System.currentTimeMillis())
+ .putSummary("operation", "replace")
+ .build();
+
+ ViewMetadata replacement =
+ ViewMetadata.buildFrom(metadata)
+ .setProperties(properties)
+ .setCurrentVersion(viewVersion, schema)
+ .build();
+
+ try {
+ ops.commit(metadata, replacement);
+ } catch (CommitFailedException ignored) {
+ throw new AlreadyExistsException("View was updated concurrently: %s",
identifier);
+ }
+
+ return new BaseView(ops, ViewUtil.fullViewName(name(), identifier));
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseView.java
b/core/src/main/java/org/apache/iceberg/view/BaseView.java
new file mode 100644
index 0000000000..a21bc2381f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseView.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+
+public class BaseView implements View, Serializable {
+
+ private final ViewOperations ops;
+ private final String name;
+
+ public BaseView(ViewOperations ops, String name) {
+ this.ops = ops;
+ this.name = name;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ public ViewOperations operations() {
+ return ops;
+ }
+
+ @Override
+ public Schema schema() {
+ return operations().current().schema();
+ }
+
+ @Override
+ public Map<Integer, Schema> schemas() {
+ return operations().current().schemasById();
+ }
+
+ @Override
+ public ViewVersion currentVersion() {
+ return operations().current().currentVersion();
+ }
+
+ @Override
+ public Iterable<ViewVersion> versions() {
+ return operations().current().versions();
+ }
+
+ @Override
+ public ViewVersion version(int versionId) {
+ return operations().current().version(versionId);
+ }
+
+ @Override
+ public List<ViewHistoryEntry> history() {
+ return operations().current().history();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return operations().current().properties();
+ }
+
+ @Override
+ public UpdateViewProperties updateProperties() {
+ return new PropertiesUpdate(ops);
+ }
+
+ @Override
+ public ReplaceViewVersion replaceVersion() {
+ return new ViewVersionReplace(ops);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
new file mode 100644
index 0000000000..f7270b9a35
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Objects;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseViewOperations implements ViewOperations {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseViewOperations.class);
+
+ private static final String METADATA_FOLDER_NAME = "metadata";
+
+ private ViewMetadata currentMetadata = null;
+ private String currentMetadataLocation = null;
+ private boolean shouldRefresh = true;
+ private int version = -1;
+
+ protected BaseViewOperations() {}
+
+ protected void requestRefresh() {
+ this.shouldRefresh = true;
+ }
+
+ protected void disableRefresh() {
+ this.shouldRefresh = false;
+ }
+
+ protected abstract void doRefresh();
+
+ protected abstract void doCommit(ViewMetadata base, ViewMetadata metadata);
+
+ protected abstract String viewName();
+
+ protected abstract FileIO io();
+
+ protected String currentMetadataLocation() {
+ return currentMetadataLocation;
+ }
+
+ protected int currentVersion() {
+ return version;
+ }
+
+ @Override
+ public ViewMetadata current() {
+ if (shouldRefresh) {
+ return refresh();
+ }
+
+ return currentMetadata;
+ }
+
+ @Override
+ public ViewMetadata refresh() {
+ boolean currentMetadataWasAvailable = currentMetadata != null;
+ try {
+ doRefresh();
+ } catch (NoSuchViewException e) {
+ if (currentMetadataWasAvailable) {
+ LOG.warn("Could not find the view during refresh, setting current
metadata to null", e);
+ shouldRefresh = true;
+ }
+
+ currentMetadata = null;
+ currentMetadataLocation = null;
+ version = -1;
+ throw e;
+ }
+
+ return current();
+ }
+
+ @Override
+ public void commit(ViewMetadata base, ViewMetadata metadata) {
+ // if the metadata is already out of date, reject it
+ if (base != current()) {
+ if (base != null) {
+ throw new CommitFailedException("Cannot commit: stale view metadata");
+ } else {
+ // when current is non-null, the view exists. but when base is null,
the commit is trying
+ // to create the view
+ throw new AlreadyExistsException("View already exists: %s",
viewName());
+ }
+ }
+
+ // if the metadata is not changed, return early
+ if (base == metadata) {
+ LOG.info("Nothing to commit.");
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ doCommit(base, metadata);
+ requestRefresh();
+
+ LOG.info(
+ "Successfully committed to view {} in {} ms",
+ viewName(),
+ System.currentTimeMillis() - start);
+ }
+
+ private String writeNewMetadata(ViewMetadata metadata, int newVersion) {
+ String newMetadataFilePath = newMetadataFilePath(metadata, newVersion);
+ OutputFile newMetadataLocation = io().newOutputFile(newMetadataFilePath);
+
+ // write the new metadata
+ // use overwrite to avoid negative caching in S3. this is safe because the
metadata location is
+ // always unique because it includes a UUID.
+ ViewMetadataParser.overwrite(metadata, newMetadataLocation);
+
+ return newMetadataLocation.location();
+ }
+
+ protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
+ return null != metadata.metadataFileLocation()
+ ? metadata.metadataFileLocation()
+ : writeNewMetadata(metadata, version + 1);
+ }
+
+ private String newMetadataFilePath(ViewMetadata metadata, int newVersion) {
+ return metadataFileLocation(
+ metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(),
".metadata.json"));
+ }
+
+ private String metadataFileLocation(ViewMetadata metadata, String filename) {
+ return String.format(
+ "%s/%s/%s",
+ LocationUtil.stripTrailingSlash(metadata.location()),
METADATA_FOLDER_NAME, filename);
+ }
+
+ protected void refreshFromMetadataLocation(String newLocation) {
+ refreshFromMetadataLocation(newLocation, null, 20);
+ }
+
+ protected void refreshFromMetadataLocation(
+ String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
+ refreshFromMetadataLocation(
+ newLocation,
+ shouldRetry,
+ numRetries,
+ metadataLocation ->
ViewMetadataParser.read(io().newInputFile(metadataLocation)));
+ }
+
+ protected void refreshFromMetadataLocation(
+ String newLocation,
+ Predicate<Exception> shouldRetry,
+ int numRetries,
+ Function<String, ViewMetadata> metadataLoader) {
+ if (!Objects.equal(currentMetadataLocation, newLocation)) {
+ LOG.info("Refreshing view metadata from new version: {}", newLocation);
+
+ AtomicReference<ViewMetadata> newMetadata = new AtomicReference<>();
+ Tasks.foreach(newLocation)
+ .retry(numRetries)
+ .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
+ .throwFailureWhenFinished()
+ .stopRetryOn(NotFoundException.class) // overridden if shouldRetry
is non-null
+ .shouldRetryTest(shouldRetry)
+ .run(metadataLocation ->
newMetadata.set(metadataLoader.apply(metadataLocation)));
+
+ this.currentMetadata = newMetadata.get();
+ this.currentMetadataLocation = newLocation;
+ this.version = parseVersion(newLocation);
+ }
+
+ this.shouldRefresh = false;
+ }
+
+ /**
+ * Parse the version from view metadata file name.
+ *
+ * @param metadataLocation view metadata file location
+ * @return version of the view metadata file in success case and -1 if the
version is not parsable
+ * (as a sign that the metadata is not part of this catalog)
+ */
+ private static int parseVersion(String metadataLocation) {
+ int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't
found, this will be 0
+ int versionEnd = metadataLocation.indexOf('-', versionStart);
+ if (versionEnd < 0) {
+ // found filesystem view's metadata
+ return -1;
+ }
+
+ try {
+ return Integer.valueOf(metadataLocation.substring(versionStart,
versionEnd));
+ } catch (NumberFormatException e) {
+ LOG.warn("Unable to parse version from metadata location: {}",
metadataLocation, e);
+ return -1;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java
b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java
new file mode 100644
index 0000000000..48bcfc3a68
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class PropertiesUpdate implements UpdateViewProperties {
+ private final ViewOperations ops;
+ private final Map<String, String> updates = Maps.newHashMap();
+ private final Set<String> removals = Sets.newHashSet();
+ private ViewMetadata base;
+
+ PropertiesUpdate(ViewOperations ops) {
+ this.ops = ops;
+ this.base = ops.current();
+ }
+
+ @Override
+ public Map<String, String> apply() {
+ return internalApply().properties();
+ }
+
+ private ViewMetadata internalApply() {
+ this.base = ops.refresh();
+
+ return
ViewMetadata.buildFrom(base).setProperties(updates).removeProperties(removals).build();
+ }
+
+ @Override
+ public void commit() {
+ Tasks.foreach(ops)
+ .retry(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(taskOps -> taskOps.commit(base, internalApply()));
+ }
+
+ @Override
+ public UpdateViewProperties set(String key, String value) {
+ Preconditions.checkArgument(null != key, "Invalid key: null");
+ Preconditions.checkArgument(null != value, "Invalid value: null");
+ Preconditions.checkArgument(
+ !removals.contains(key), "Cannot remove and update the same key: %s",
key);
+
+ updates.put(key, value);
+ return this;
+ }
+
+ @Override
+ public UpdateViewProperties remove(String key) {
+ Preconditions.checkArgument(null != key, "Invalid key: null");
+ Preconditions.checkArgument(
+ !updates.containsKey(key), "Cannot remove and update the same key:
%s", key);
+
+ removals.add(key);
+ return this;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
index 43dba2f6b2..68619ddb22 100644
--- a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
@@ -35,6 +35,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.immutables.value.Value;
import org.immutables.value.Value.Style.ImplementationVisibility;
@@ -272,6 +273,17 @@ public interface ViewMetadata extends Serializable {
"Cannot add version with unknown schema: %s",
version.schemaId());
+ Set<String> dialects = Sets.newHashSet();
+ for (ViewRepresentation repr : version.representations()) {
+ if (repr instanceof SQLViewRepresentation) {
+ SQLViewRepresentation sql = (SQLViewRepresentation) repr;
+ Preconditions.checkArgument(
+ dialects.add(sql.dialect()),
+ "Invalid view version: Cannot add multiple queries for dialect
%s",
+ sql.dialect());
+ }
+ }
+
ViewVersion newVersion;
if (newVersionId != version.versionId()) {
newVersion =
ImmutableViewVersion.builder().from(version).versionId(newVersionId).build();
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewOperations.java
b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java
new file mode 100644
index 0000000000..37133161b9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+/** SPI interface to abstract view metadata access and updates. */
+public interface ViewOperations {
+
+ /**
+ * Return the currently loaded view metadata, without checking for updates.
+ *
+ * @return view metadata
+ */
+ ViewMetadata current();
+
+ /**
+ * Return the current view metadata after checking for updates.
+ *
+ * @return view metadata
+ */
+ ViewMetadata refresh();
+
+ /**
+ * Replace the base view metadata with a new version.
+ *
+ * <p>This method should implement and document atomicity guarantees.
+ *
+ * <p>Implementations must check that the base metadata is current to avoid
overwriting updates.
+ * Once the atomic commit operation succeeds, implementations must not
perform any operations that
+ * may fail because failure in this method cannot be distinguished from
commit failure.
+ *
+ * <p>Implementations should throw a {@link
+ * org.apache.iceberg.exceptions.CommitStateUnknownException} in cases where
it cannot be
+ * determined if the commit succeeded or failed. For example if a network
partition causes the
+ * confirmation of the commit to be lost, the implementation should throw a
+ * CommitStateUnknownException. An unknown state indicates to downstream
users of this API that it
+ * is not safe to perform clean up and remove any files. In general, strict
metadata cleanup will
+ * only trigger cleanups when the commit fails with an exception
implementing the marker interface
+ * {@link org.apache.iceberg.exceptions.CleanableFailure}. All other
exceptions will be treated as
+ * if the commit has failed.
+ *
+ * @param base view metadata on which changes were based
+ * @param metadata new view metadata with updates
+ */
+ void commit(ViewMetadata base, ViewMetadata metadata);
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewUtil.java
b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java
new file mode 100644
index 0000000000..b79e2c3ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+public class ViewUtil {
+ private ViewUtil() {}
+
+ public static String fullViewName(String catalog, TableIdentifier ident) {
+ return catalog + "." + ident;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
new file mode 100644
index 0000000000..15f05c531d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class ViewVersionReplace implements ReplaceViewVersion {
+ private final ViewOperations ops;
+ private final List<ViewRepresentation> representations =
Lists.newArrayList();
+ private ViewMetadata base;
+ private Namespace defaultNamespace = null;
+ private String defaultCatalog = null;
+ private Schema schema = null;
+
+ ViewVersionReplace(ViewOperations ops) {
+ this.ops = ops;
+ this.base = ops.current();
+ }
+
+ @Override
+ public ViewVersion apply() {
+ return internalApply().currentVersion();
+ }
+
+ private ViewMetadata internalApply() {
+ Preconditions.checkState(
+ !representations.isEmpty(), "Cannot replace view without specifying a
query");
+ Preconditions.checkState(null != schema, "Cannot replace view without
specifying schema");
+ Preconditions.checkState(
+ null != defaultNamespace, "Cannot replace view without specifying a
default namespace");
+
+ this.base = ops.refresh();
+
+ ViewVersion viewVersion = base.currentVersion();
+ int maxVersionId =
+ base.versions().stream()
+ .map(ViewVersion::versionId)
+ .max(Integer::compareTo)
+ .orElseGet(viewVersion::versionId);
+
+ ViewVersion newVersion =
+ ImmutableViewVersion.builder()
+ .versionId(maxVersionId + 1)
+ .timestampMillis(System.currentTimeMillis())
+ .schemaId(schema.schemaId())
+ .defaultNamespace(defaultNamespace)
+ .defaultCatalog(defaultCatalog)
+ .putSummary("operation", "replace")
+ .addAllRepresentations(representations)
+ .build();
+
+ return ViewMetadata.buildFrom(base).setCurrentVersion(newVersion,
schema).build();
+ }
+
+ @Override
+ public void commit() {
+ Tasks.foreach(ops)
+ .retry(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(taskOps -> taskOps.commit(base, internalApply()));
+ }
+
+ @Override
+ public ReplaceViewVersion withQuery(String dialect, String sql) {
+
representations.add(ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build());
+ return this;
+ }
+
+ @Override
+ public ReplaceViewVersion withSchema(Schema newSchema) {
+ this.schema = newSchema;
+ return this;
+ }
+
+ @Override
+ public ReplaceViewVersion withDefaultCatalog(String catalog) {
+ this.defaultCatalog = catalog;
+ return this;
+ }
+
+ @Override
+ public ReplaceViewVersion withDefaultNamespace(Namespace namespace) {
+ this.defaultNamespace = namespace;
+ return this;
+ }
+}
diff --git
a/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java
b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java
new file mode 100644
index 0000000000..76731f58a6
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/inmemory/TestInMemoryViewCatalog.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.inmemory;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.view.ViewCatalogTests;
+import org.junit.jupiter.api.BeforeEach;
+
+public class TestInMemoryViewCatalog extends ViewCatalogTests<InMemoryCatalog>
{
+ private InMemoryCatalog catalog;
+
+ @BeforeEach
+ public void before() {
+ this.catalog = new InMemoryCatalog();
+ this.catalog.initialize("in-memory-catalog", ImmutableMap.of());
+ }
+
+ @Override
+ protected InMemoryCatalog catalog() {
+ return catalog;
+ }
+
+ @Override
+ protected Catalog tableCatalog() {
+ return catalog;
+ }
+
+ @Override
+ protected boolean requiresNamespaceCreate() {
+ return true;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
index 4706640f53..7837d94055 100644
--- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadata.java
@@ -727,4 +727,35 @@ public class TestViewMetadata {
.containsExactly(schemaOne.asStruct(), schemaTwo.asStruct(),
schemaThree.asStruct());
assertThat(viewMetadata.schemasById().keySet()).containsExactly(0, 1, 2);
}
+
+ @Test
+ public void viewMetadataWithMultipleSQLForSameDialect() {
+ assertThatThrownBy(
+ () ->
+ ViewMetadata.builder()
+ .setLocation("custom-location")
+ .addSchema(new Schema(Types.NestedField.required(1, "x",
Types.LongType.get())))
+ .addVersion(
+ ImmutableViewVersion.builder()
+ .schemaId(0)
+ .versionId(1)
+ .timestampMillis(23L)
+ .putSummary("operation", "create")
+ .defaultNamespace(Namespace.of("ns"))
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .dialect("spark")
+ .sql("select * from ns.tbl")
+ .build())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .dialect("spark")
+ .sql("select * from ns.tbl2")
+ .build())
+ .build())
+ .setCurrentVersionId(1)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid view version: Cannot add multiple queries for
dialect spark");
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
index 5fb9589522..267195c133 100644
--- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
+++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
@@ -226,4 +227,85 @@ public class TestViewMetadataParser {
.isEqualTo(expectedViewMetadata);
assertThat(actual.metadataFileLocation()).isEqualTo(metadataLocation);
}
+
+ @Test
+ public void viewMetadataWithMultipleSQLsForDialectShouldBeReadable() throws
Exception {
+ ViewVersion viewVersion =
+ ImmutableViewVersion.builder()
+ .versionId(1)
+ .timestampMillis(4353L)
+ .summary(ImmutableMap.of("operation", "create"))
+ .schemaId(0)
+ .defaultCatalog("some-catalog")
+ .defaultNamespace(Namespace.empty())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select 'foo' foo")
+ .dialect("spark-sql")
+ .build())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from foo")
+ .dialect("spark-sql")
+ .build())
+ .build();
+
+ String json =
+ readViewMetadataInputFile(
+ "org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json");
+
+ // builder will throw an exception due to having multiple SQLs for the
same dialect, thus
+ // construct the expected view metadata directly
+ ViewMetadata expectedViewMetadata =
+ ImmutableViewMetadata.of(
+ "fa6506c3-7681-40c8-86dc-e36561f83385",
+ 1,
+ "s3://bucket/test/location",
+ ImmutableList.of(TEST_SCHEMA),
+ 1,
+ ImmutableList.of(viewVersion),
+ ImmutableList.of(
+
ImmutableViewHistoryEntry.builder().versionId(1).timestampMillis(4353).build()),
+ ImmutableMap.of("some-key", "some-value"),
+ ImmutableList.of(),
+ null);
+
+ // reading view metadata with multiple SQLs for the same dialects
shouldn't fail
+ ViewMetadata actual = ViewMetadataParser.fromJson(json);
+ assertThat(actual)
+ .usingRecursiveComparison()
+ .ignoringFieldsOfTypes(Schema.class)
+ .isEqualTo(expectedViewMetadata);
+ }
+
+ @Test
+ public void replaceViewMetadataWithMultipleSQLsForDialect() throws Exception
{
+ String json =
+ readViewMetadataInputFile(
+ "org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json");
+
+ // reading view metadata with multiple SQLs for the same dialects
shouldn't fail
+ ViewMetadata invalid = ViewMetadataParser.fromJson(json);
+
+ // replace metadata with a new view version that fixes the SQL
representations
+ ViewVersion viewVersion =
+ ImmutableViewVersion.builder()
+ .versionId(2)
+ .schemaId(0)
+ .timestampMillis(5555L)
+ .summary(ImmutableMap.of("operation", "replace"))
+ .defaultCatalog("some-catalog")
+ .defaultNamespace(Namespace.empty())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from foo")
+ .dialect("spark-sql")
+ .build())
+ .build();
+
+ ViewMetadata replaced =
+
ViewMetadata.buildFrom(invalid).addVersion(viewVersion).setCurrentVersionId(2).build();
+
+ assertThat(replaced.currentVersion()).isEqualTo(viewVersion);
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
new file mode 100644
index 0000000000..1c95955383
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
@@ -0,0 +1,1340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assumptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public abstract class ViewCatalogTests<C extends ViewCatalog &
SupportsNamespaces> {
+ protected static final Schema SCHEMA =
+ new Schema(
+ 5,
+ required(3, "id", Types.IntegerType.get(), "unique ID"),
+ required(4, "data", Types.StringType.get()));
+
+ private static final Schema OTHER_SCHEMA =
+ new Schema(7, required(1, "some_id", Types.IntegerType.get()));
+
+ protected abstract C catalog();
+
+ protected abstract Catalog tableCatalog();
+
+ protected boolean requiresNamespaceCreate() {
+ return false;
+ }
+
+ @Test
+ public void basicCreateView() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(view).isNotNull();
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // validate view settings
+ assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(),
identifier));
+ assertThat(view.history())
+ .hasSize(1)
+ .first()
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(1);
+ assertThat(view.schema().schemaId()).isEqualTo(0);
+ assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
+ assertThat(view.schemas()).hasSize(1).containsKey(0);
+
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());
+
+ assertThat(view.currentVersion())
+ .isEqualTo(
+ ImmutableViewVersion.builder()
+ .timestampMillis(view.currentVersion().timestampMillis())
+ .versionId(1)
+ .schemaId(0)
+ .putSummary("operation", "create")
+ .defaultNamespace(identifier.namespace())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("spark")
+ .build())
+ .build());
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void completeCreateView() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withDefaultCatalog(catalog().name())
+ .withQuery("spark", "select * from ns.tbl")
+ .withQuery("trino", "select * from ns.tbl using X")
+ .withProperty("prop1", "val1")
+ .withProperty("prop2", "val2")
+ .create();
+
+ assertThat(view).isNotNull();
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // validate view settings
+ assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(),
identifier));
+ assertThat(view.properties()).containsEntry("prop1",
"val1").containsEntry("prop2", "val2");
+ assertThat(view.history())
+ .hasSize(1)
+ .first()
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(1);
+ assertThat(view.schema().schemaId()).isEqualTo(0);
+ assertThat(view.schema().asStruct()).isEqualTo(SCHEMA.asStruct());
+ assertThat(view.schemas()).hasSize(1).containsKey(0);
+
assertThat(view.versions()).hasSize(1).containsExactly(view.currentVersion());
+
+ assertThat(view.currentVersion())
+ .isEqualTo(
+ ImmutableViewVersion.builder()
+ .timestampMillis(view.currentVersion().timestampMillis())
+ .versionId(1)
+ .schemaId(0)
+ .putSummary("operation", "create")
+ .defaultNamespace(identifier.namespace())
+ .defaultCatalog(catalog().name())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("spark")
+ .build())
+ .addRepresentations(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl using X")
+ .dialect("trino")
+ .build())
+ .build());
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void createViewErrorCases() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ SQLViewRepresentation trino =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("trino")
+ .build();
+
+ // query is required
+ assertThatThrownBy(() -> catalog().buildView(identifier).create())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot create view without specifying a query");
+
+ // schema is required
+ assertThatThrownBy(
+ () -> catalog().buildView(identifier).withQuery(trino.dialect(),
trino.sql()).create())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot create view without specifying schema");
+
+ // default namespace is required
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(identifier)
+ .withQuery(trino.dialect(), trino.sql())
+ .withSchema(SCHEMA)
+ .create())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot create view without specifying a default
namespace");
+
+ // cannot define multiple SQLs for same dialect
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .withQuery(trino.dialect(), trino.sql())
+ .create())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid view version: Cannot add multiple queries for
dialect trino");
+ }
+
+ @Test
+ public void createViewThatAlreadyExists() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(view).isNotNull();
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(identifier)
+ .withSchema(OTHER_SCHEMA)
+ .withQuery("spark", "select * from ns.tbl")
+ .withDefaultNamespace(identifier.namespace())
+ .create())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("View already exists: ns.view");
+ }
+
+ @Test
+ public void createViewThatAlreadyExistsAsTable() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(tableIdentifier.namespace());
+ }
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
not exist").isFalse();
+
+ tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
exist").isTrue();
+
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(tableIdentifier)
+ .withSchema(OTHER_SCHEMA)
+ .withDefaultNamespace(tableIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("Table with same name already exists:
ns.table");
+ }
+
+ @Test
+ public void createTableThatAlreadyExistsAsView() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(viewIdentifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(() -> tableCatalog().buildTable(viewIdentifier,
SCHEMA).create())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("View with same name already exists: ns.view");
+ }
+
+ @Test
+ public void createTableViaTransactionThatAlreadyExistsAsView() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(viewIdentifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should not
exist").isFalse();
+
+ Transaction transaction = tableCatalog().buildTable(viewIdentifier,
SCHEMA).createTransaction();
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(transaction::commitTransaction)
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("View with same name already exists: ns.view");
+ }
+
+ @Test
+ public void replaceTableViaTransactionThatAlreadyExistsAsView() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(viewIdentifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should
exist").isTrue();
+
+ // replace transaction requires table existence
+ // TODO: replace should check whether the table exists as a view
+ assertThatThrownBy(
+ () ->
+ tableCatalog()
+ .buildTable(viewIdentifier, SCHEMA)
+ .replaceTransaction()
+ .commitTransaction())
+ .isInstanceOf(NoSuchTableException.class)
+ .hasMessageStartingWith("Table does not exist: ns.view");
+ }
+
+ @Test
+ public void createOrReplaceTableViaTransactionThatAlreadyExistsAsView() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(viewIdentifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(
+ () ->
+ tableCatalog()
+ .buildTable(viewIdentifier, SCHEMA)
+ .createOrReplaceTransaction()
+ .commitTransaction())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("View with same name already exists: ns.view");
+ }
+
+ @Test
+ public void replaceViewThatAlreadyExistsAsTable() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(tableIdentifier.namespace());
+ }
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
not exist").isFalse();
+
+ tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
exist").isTrue();
+
+ // replace view requires the view to exist
+ // TODO: replace should check whether the view exists as a table
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(tableIdentifier)
+ .withSchema(OTHER_SCHEMA)
+ .withDefaultNamespace(tableIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .replace())
+ .isInstanceOf(NoSuchViewException.class)
+ .hasMessageStartingWith("View does not exist: ns.table");
+ }
+
+ @Test
+ public void createOrReplaceViewThatAlreadyExistsAsTable() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(tableIdentifier.namespace());
+ }
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
not exist").isFalse();
+
+ tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
exist").isTrue();
+
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(tableIdentifier)
+ .withSchema(OTHER_SCHEMA)
+ .withDefaultNamespace(tableIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .createOrReplace())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageStartingWith("Table with same name already exists:
ns.table");
+ }
+
+ @Test
+ public void renameView() {
+ TableIdentifier from = TableIdentifier.of("ns", "view");
+ TableIdentifier to = TableIdentifier.of("ns", "renamedView");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(from.namespace());
+ }
+
+ assertThat(catalog().viewExists(from)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(from)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(from.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+ ViewMetadata original = ((BaseView) view).operations().current();
+
+ catalog().renameView(from, to);
+
+ assertThat(catalog().viewExists(from)).as("View should not exist with old
name").isFalse();
+ assertThat(catalog().viewExists(to)).as("View should exist with new
name").isTrue();
+
+ // ensure view metadata didn't change after renaming
+ View renamed = catalog().loadView(to);
+ assertThat(((BaseView) renamed).operations().current())
+ .usingRecursiveComparison()
+ .ignoringFieldsOfTypes(Schema.class)
+ .isEqualTo(original);
+
+ assertThat(catalog().dropView(from)).isFalse();
+ assertThat(catalog().dropView(to)).isTrue();
+ assertThat(catalog().viewExists(to)).as("View should not exist").isFalse();
+ }
+
+ @Test
+ public void renameViewUsingDifferentNamespace() {
+ TableIdentifier from = TableIdentifier.of("ns", "view");
+ TableIdentifier to = TableIdentifier.of("other_ns", "renamedView");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(from.namespace());
+ catalog().createNamespace(to.namespace());
+ }
+
+ assertThat(catalog().viewExists(from)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(from)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(from.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+ ViewMetadata original = ((BaseView) view).operations().current();
+
+ catalog().renameView(from, to);
+
+ assertThat(catalog().viewExists(from)).as("View should not exist with old
name").isFalse();
+ assertThat(catalog().viewExists(to)).as("View should exist with new
name").isTrue();
+
+ // ensure view metadata didn't change after renaming
+ View renamed = catalog().loadView(to);
+ assertThat(((BaseView) renamed).operations().current())
+ .usingRecursiveComparison()
+ .ignoringFieldsOfTypes(Schema.class)
+ .isEqualTo(original);
+
+ assertThat(catalog().dropView(from)).isFalse();
+ assertThat(catalog().dropView(to)).isTrue();
+ assertThat(catalog().viewExists(to)).as("View should not exist").isFalse();
+ }
+
+ @Test
+ public void renameViewNamespaceMissing() {
+ TableIdentifier from = TableIdentifier.of("ns", "view");
+ TableIdentifier to = TableIdentifier.of("non_existing", "renamedView");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(from.namespace());
+ }
+
+ assertThat(catalog().viewExists(from)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(from)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(from.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(from)).as("View should exist").isTrue();
+
+ assertThatThrownBy(() -> catalog().renameView(from, to))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: non_existing");
+ }
+
+ @Test
+ public void renameViewSourceMissing() {
+ TableIdentifier from = TableIdentifier.of("ns", "non_existing");
+ TableIdentifier to = TableIdentifier.of("ns", "renamedView");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(from.namespace());
+ }
+
+ assertThat(catalog().viewExists(from)).as("View should not
exist").isFalse();
+
+ assertThatThrownBy(() -> catalog().renameView(from, to))
+ .isInstanceOf(NoSuchViewException.class)
+ .hasMessageContaining("View does not exist");
+ }
+
+ @Test
+ public void renameViewTargetAlreadyExistsAsView() {
+ TableIdentifier viewOne = TableIdentifier.of("ns", "viewOne");
+ TableIdentifier viewTwo = TableIdentifier.of("ns", "viewTwo");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(viewOne.namespace());
+ }
+
+ for (TableIdentifier identifier : ImmutableList.of(viewOne, viewTwo)) {
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewOne.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ }
+
+ assertThatThrownBy(() -> catalog().renameView(viewOne, viewTwo))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Cannot rename ns.viewOne to ns.viewTwo. View
already exists");
+ }
+
+ @Test
+ public void renameViewTargetAlreadyExistsAsTable() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ TableIdentifier viewIdentifier = TableIdentifier.of("ns", "view");
+ TableIdentifier tableIdentifier = TableIdentifier.of("ns", "table");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(tableIdentifier.namespace());
+ }
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
not exist").isFalse();
+
+ tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+
+ assertThat(tableCatalog().tableExists(tableIdentifier)).as("Table should
exist").isTrue();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(viewIdentifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(() -> catalog().renameView(viewIdentifier,
tableIdentifier))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Cannot rename ns.view to ns.table. Table
already exists");
+ }
+
+ @Test
+ public void listViews() {
+ Namespace ns1 = Namespace.of("ns1");
+ Namespace ns2 = Namespace.of("ns2");
+
+ TableIdentifier view1 = TableIdentifier.of(ns1, "view1");
+ TableIdentifier view2 = TableIdentifier.of(ns2, "view2");
+ TableIdentifier view3 = TableIdentifier.of(ns2, "view3");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(ns1);
+ catalog().createNamespace(ns2);
+ }
+
+ assertThat(catalog().listViews(ns1)).isEmpty();
+ assertThat(catalog().listViews(ns2)).isEmpty();
+
+ catalog()
+ .buildView(view1)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(view1.namespace())
+ .withQuery("spark", "select * from ns1.tbl")
+ .create();
+
+ assertThat(catalog().listViews(ns1)).containsExactly(view1);
+ assertThat(catalog().listViews(ns2)).isEmpty();
+
+ catalog()
+ .buildView(view2)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(view2.namespace())
+ .withQuery("spark", "select * from ns1.tbl")
+ .create();
+
+ assertThat(catalog().listViews(ns1)).containsExactly(view1);
+ assertThat(catalog().listViews(ns2)).containsExactly(view2);
+
+ catalog()
+ .buildView(view3)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(view3.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().listViews(ns1)).containsExactly(view1);
+ assertThat(catalog().listViews(ns2)).containsExactlyInAnyOrder(view2,
view3);
+
+ assertThat(catalog().dropView(view2)).isTrue();
+ assertThat(catalog().listViews(ns1)).containsExactly(view1);
+ assertThat(catalog().listViews(ns2)).containsExactly(view3);
+
+ assertThat(catalog().dropView(view3)).isTrue();
+ assertThat(catalog().listViews(ns1)).containsExactly(view1);
+ assertThat(catalog().listViews(ns2)).isEmpty();
+
+ assertThat(catalog().dropView(view1)).isTrue();
+ assertThat(catalog().listViews(ns1)).isEmpty();
+ assertThat(catalog().listViews(ns2)).isEmpty();
+ }
+
+ @Test
+ public void listViewsAndTables() {
+ Assumptions.assumeThat(tableCatalog())
+ .as("Only valid for catalogs that support tables")
+ .isNotNull();
+
+ Namespace ns = Namespace.of("ns");
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(ns, "table");
+ TableIdentifier viewIdentifier = TableIdentifier.of(ns, "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(ns);
+ }
+
+ assertThat(catalog().listViews(ns)).isEmpty();
+ assertThat(tableCatalog().listTables(ns)).isEmpty();
+
+ tableCatalog().buildTable(tableIdentifier, SCHEMA).create();
+ assertThat(catalog().listViews(ns)).isEmpty();
+ assertThat(tableCatalog().listTables(ns)).containsExactly(tableIdentifier);
+
+ catalog()
+ .buildView(viewIdentifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(viewIdentifier.namespace())
+ .withQuery("spark", "select * from ns1.tbl")
+ .create();
+
+ assertThat(catalog().listViews(ns)).containsExactly(viewIdentifier);
+ assertThat(tableCatalog().listTables(ns)).containsExactly(tableIdentifier);
+
+ assertThat(tableCatalog().dropTable(tableIdentifier)).isTrue();
+ assertThat(catalog().listViews(ns)).containsExactly(viewIdentifier);
+ assertThat(tableCatalog().listTables(ns)).isEmpty();
+
+ assertThat(catalog().dropView(viewIdentifier)).isTrue();
+ assertThat(catalog().listViews(ns)).isEmpty();
+ assertThat(tableCatalog().listTables(ns)).isEmpty();
+ }
+
+ @ParameterizedTest(name = ".createOrReplace() = {arguments}")
+ @ValueSource(booleans = {false, true})
+ public void createOrReplaceView(boolean useCreateOrReplace) {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ ViewBuilder viewBuilder =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .withProperty("prop1", "val1")
+ .withProperty("prop2", "val2");
+ View view = useCreateOrReplace ? viewBuilder.createOrReplace() :
viewBuilder.create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ ViewVersion viewVersion = view.currentVersion();
+ assertThat(viewVersion.representations())
+ .containsExactly(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("spark")
+ .build());
+
+ viewBuilder =
+ catalog()
+ .buildView(identifier)
+ .withSchema(OTHER_SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select count(*) from ns.tbl")
+ .withProperty("replacedProp1", "val1")
+ .withProperty("replacedProp2", "val2");
+ View replacedView = useCreateOrReplace ? viewBuilder.createOrReplace() :
viewBuilder.replace();
+
+ // validate replaced view settings
+
assertThat(replacedView.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(),
identifier));
+ assertThat(replacedView.properties())
+ .containsEntry("prop1", "val1")
+ .containsEntry("prop2", "val2")
+ .containsEntry("replacedProp1", "val1")
+ .containsEntry("replacedProp2", "val2");
+ assertThat(replacedView.history())
+ .hasSize(2)
+ .first()
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(1);
+ assertThat(replacedView.history())
+ .element(1)
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(2);
+
+ assertThat(replacedView.schema().schemaId()).isEqualTo(1);
+
assertThat(replacedView.schema().asStruct()).isEqualTo(OTHER_SCHEMA.asStruct());
+
assertThat(replacedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
+
+ ViewVersion replacedViewVersion = replacedView.currentVersion();
+ assertThat(replacedView.versions())
+ .hasSize(2)
+ .containsExactly(viewVersion, replacedViewVersion);
+ assertThat(replacedViewVersion).isNotNull();
+ assertThat(replacedViewVersion.versionId()).isEqualTo(2);
+ assertThat(replacedViewVersion.schemaId()).isEqualTo(1);
+ assertThat(replacedViewVersion.operation()).isEqualTo("replace");
+
assertThat(replacedViewVersion.summary()).hasSize(1).containsEntry("operation",
"replace");
+ assertThat(replacedViewVersion.representations())
+ .containsExactly(
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select count(*) from ns.tbl")
+ .dialect("trino")
+ .build());
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void replaceViewErrorCases() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ SQLViewRepresentation trino =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("trino")
+ .build();
+
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // query is required
+ assertThatThrownBy(() -> catalog().buildView(identifier).replace())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying a query");
+
+ // schema is required
+ assertThatThrownBy(
+ () -> catalog().buildView(identifier).withQuery(trino.dialect(),
trino.sql()).replace())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying schema");
+
+ // default namespace is required
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(identifier)
+ .withQuery(trino.dialect(), trino.sql())
+ .withSchema(SCHEMA)
+ .replace())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying a default
namespace");
+
+ // cannot replace non-existing view
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(TableIdentifier.of("ns", "non_existing"))
+ .withQuery(trino.dialect(), trino.sql())
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .replace())
+ .isInstanceOf(NoSuchViewException.class)
+ .hasMessageStartingWith("View does not exist: ns.non_existing");
+
+ // cannot define multiple SQLs for same dialect
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .withQuery(trino.dialect(), trino.sql())
+ .replace())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid view version: Cannot add multiple queries for
dialect trino");
+ }
+
+ @Test
+ public void updateViewProperties() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ ViewVersion viewVersion = view.currentVersion();
+
+ view.updateProperties().set("key1", "val1").set("key2",
"val2").remove("non-existing").commit();
+
+ View updatedView = catalog().loadView(identifier);
+ assertThat(updatedView.properties())
+ .containsEntry("key1", "val1")
+ .containsEntry("key2", "val2");
+
+ // history and view versions should stay the same after updating view
properties
+ assertThat(updatedView.history()).hasSize(1).isEqualTo(view.history());
+ assertThat(updatedView.versions()).hasSize(1).containsExactly(viewVersion);
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void updateViewPropertiesErrorCases() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("spark", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ assertThatThrownBy(
+ () -> catalog().loadView(identifier).updateProperties().set(null,
"new-val1").commit())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid key: null");
+
+ assertThatThrownBy(
+ () ->
catalog().loadView(identifier).updateProperties().set("key1", null).commit())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid value: null");
+
+ assertThatThrownBy(
+ () ->
catalog().loadView(identifier).updateProperties().remove(null).commit())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid key: null");
+
+ assertThatThrownBy(
+ () ->
+ catalog()
+ .loadView(identifier)
+ .updateProperties()
+ .set("key1", "x")
+ .set("key3", "y")
+ .remove("key2")
+ .set("key2", "z")
+ .commit())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot remove and update the same key: key2");
+ }
+
+ @Test
+ public void replaceViewVersion() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ SQLViewRepresentation spark =
+ ImmutableSQLViewRepresentation.builder()
+ .dialect("spark")
+ .sql("select * from ns.tbl")
+ .build();
+
+ SQLViewRepresentation trino =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("trino")
+ .build();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .withQuery(spark.dialect(), spark.sql())
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ ViewVersion viewVersion = view.currentVersion();
+
assertThat(viewVersion.representations()).hasSize(2).containsExactly(trino,
spark);
+
+ // uses a different schema and view representation
+ view.replaceVersion()
+ .withSchema(OTHER_SCHEMA)
+ .withQuery(trino.dialect(), trino.sql())
+ .withDefaultCatalog("default")
+ .withDefaultNamespace(identifier.namespace())
+ .commit();
+
+ // history and view versions should reflect the changes
+ View updatedView = catalog().loadView(identifier);
+ assertThat(updatedView.history())
+ .hasSize(2)
+ .element(0)
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(viewVersion.versionId());
+ assertThat(updatedView.history())
+ .element(1)
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(updatedView.currentVersion().versionId());
+ assertThat(updatedView.schemas()).hasSize(2).containsKey(0).containsKey(1);
+ assertThat(updatedView.versions())
+ .hasSize(2)
+ .containsExactly(viewVersion, updatedView.currentVersion());
+
+ ViewVersion updatedViewVersion = updatedView.currentVersion();
+ assertThat(updatedViewVersion).isNotNull();
+
assertThat(updatedViewVersion.versionId()).isEqualTo(viewVersion.versionId() +
1);
+
assertThat(updatedViewVersion.summary()).hasSize(1).containsEntry("operation",
"replace");
+ assertThat(updatedViewVersion.operation()).isEqualTo("replace");
+
assertThat(updatedViewVersion.representations()).hasSize(1).containsExactly(trino);
+ assertThat(updatedViewVersion.schemaId()).isEqualTo(1);
+ assertThat(updatedViewVersion.defaultCatalog()).isEqualTo("default");
+
assertThat(updatedViewVersion.defaultNamespace()).isEqualTo(identifier.namespace());
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void replaceViewVersionByUpdatingSQLForDialect() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ SQLViewRepresentation spark =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("spark")
+ .build();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(spark.dialect(), spark.sql())
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ ViewVersion viewVersion = view.currentVersion();
+
assertThat(viewVersion.representations()).hasSize(1).containsExactly(spark);
+
+ SQLViewRepresentation updatedSpark =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.updated_tbl")
+ .dialect("spark")
+ .build();
+
+ // only update the SQL for spark
+ view.replaceVersion()
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(updatedSpark.dialect(), updatedSpark.sql())
+ .commit();
+
+ // history and view versions should reflect the changes
+ View updatedView = catalog().loadView(identifier);
+ assertThat(updatedView.history())
+ .hasSize(2)
+ .element(0)
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(viewVersion.versionId());
+ assertThat(updatedView.history())
+ .element(1)
+ .extracting(ViewHistoryEntry::versionId)
+ .isEqualTo(updatedView.currentVersion().versionId());
+ assertThat(updatedView.versions())
+ .hasSize(2)
+ .containsExactly(viewVersion, updatedView.currentVersion());
+
+ // updated view should have the new SQL
+ assertThat(updatedView.currentVersion().representations())
+ .hasSize(1)
+ .containsExactly(updatedSpark);
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void replaceViewVersionErrorCases() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ SQLViewRepresentation trino =
+ ImmutableSQLViewRepresentation.builder()
+ .sql("select * from ns.tbl")
+ .dialect("trino")
+ .build();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // empty commits are not allowed
+ assertThatThrownBy(() -> view.replaceVersion().commit())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying a query");
+
+ // schema is required
+ assertThatThrownBy(
+ () ->
+ view.replaceVersion()
+ .withQuery(trino.dialect(), trino.sql())
+ .withDefaultNamespace(identifier.namespace())
+ .commit())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying schema");
+
+ // default namespace is required
+ assertThatThrownBy(
+ () ->
+ view.replaceVersion()
+ .withQuery(trino.dialect(), trino.sql())
+ .withSchema(SCHEMA)
+ .commit())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Cannot replace view without specifying a default
namespace");
+
+ // cannot define multiple SQLs for same dialect
+ assertThatThrownBy(
+ () ->
+ view.replaceVersion()
+ .withQuery(trino.dialect(), trino.sql())
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery(trino.dialect(), trino.sql())
+ .withQuery(trino.dialect(), trino.sql())
+ .commit())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid view version: Cannot add multiple queries for
dialect trino");
+ }
+
+ @Test
+ public void updateViewPropertiesConflict() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ UpdateViewProperties updateViewProperties = view.updateProperties();
+
+ // drop view and then try to use the updateProperties API
+ catalog().dropView(identifier);
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ assertThatThrownBy(() -> updateViewProperties.set("key1", "val1").commit())
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Cannot commit");
+ }
+
+ @Test
+ public void replaceViewVersionConflict() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ ReplaceViewVersion replaceViewVersion = view.replaceVersion();
+
+ // drop view and then try to use the replaceVersion API
+ catalog().dropView(identifier);
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ assertThatThrownBy(
+ () ->
+ replaceViewVersion
+ .withQuery("trino", "select * from ns.tbl")
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .commit())
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Cannot commit");
+ }
+
+ @Test
+ public void createViewConflict() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ ViewBuilder viewBuilder = catalog().buildView(identifier);
+
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // the view was already created concurrently
+ assertThatThrownBy(
+ () ->
+ viewBuilder
+ .withQuery("trino", "select * from ns.tbl")
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .create())
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("View already exists: ns.view");
+ }
+
+ @Test
+ public void replaceViewConflict() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ ViewBuilder viewBuilder = catalog().buildView(identifier);
+
+ catalog().dropView(identifier);
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ // the view was already dropped concurrently
+ assertThatThrownBy(
+ () ->
+ viewBuilder
+ .withQuery("trino", "select * from ns.tbl")
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .replace())
+ .isInstanceOf(NoSuchViewException.class)
+ .hasMessageStartingWith("View does not exist: ns.view");
+ }
+}
diff --git
a/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
b/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
new file mode 100644
index 0000000000..f5bc045294
--- /dev/null
+++
b/core/src/test/resources/org/apache/iceberg/view/ViewMetadataMultipleSQLsForDialect.json
@@ -0,0 +1,63 @@
+{
+ "view-uuid": "fa6506c3-7681-40c8-86dc-e36561f83385",
+ "format-version": 1,
+ "location": "s3://bucket/test/location",
+ "properties": {"some-key": "some-value"},
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {
+ "id": 1,
+ "name": "x",
+ "required": true,
+ "type": "long"
+ },
+ {
+ "id": 2,
+ "name": "y",
+ "required": true,
+ "type": "long",
+ "doc": "comment"
+ },
+ {
+ "id": 3,
+ "name": "z",
+ "required": true,
+ "type": "long"
+ }
+ ]
+ }
+ ],
+ "current-version-id": 1,
+ "versions": [
+ {
+ "version-id": 1,
+ "timestamp-ms": 4353,
+ "summary": {"operation":"create"},
+ "schema-id": 0,
+ "default-catalog": "some-catalog",
+ "default-namespace": [],
+ "representations": [
+ {
+ "type": "sql",
+ "sql": "select 'foo' foo",
+ "dialect": "spark-sql"
+ },
+ {
+ "type": "sql",
+ "sql": "select * from foo",
+ "dialect": "spark-sql"
+ }
+ ]
+ }
+ ],
+ "version-log": [
+ {
+ "timestamp-ms": 4353,
+ "version-id": 1
+ }
+ ]
+}
\ No newline at end of file