This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d2e1094ee0 API, Core: Allow setting a View's location (#8648)
d2e1094ee0 is described below
commit d2e1094ee0cc6239d43f63ba5114272f59d605d2
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Oct 4 11:17:34 2023 +0200
API, Core: Allow setting a View's location (#8648)
---
.../java/org/apache/iceberg/UpdateLocation.java | 4 +-
.../main/java/org/apache/iceberg/view/View.java | 19 ++++
.../java/org/apache/iceberg/view/ViewBuilder.java | 10 ++
.../iceberg/view/BaseMetastoreViewCatalog.java | 20 +++-
.../java/org/apache/iceberg/view/BaseView.java | 11 ++
.../org/apache/iceberg/view/SetViewLocation.java | 76 ++++++++++++++
.../org/apache/iceberg/view/ViewCatalogTests.java | 112 +++++++++++++++++++++
7 files changed, 246 insertions(+), 6 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/UpdateLocation.java
b/api/src/main/java/org/apache/iceberg/UpdateLocation.java
index 646fbb1229..b069d32f10 100644
--- a/api/src/main/java/org/apache/iceberg/UpdateLocation.java
+++ b/api/src/main/java/org/apache/iceberg/UpdateLocation.java
@@ -18,10 +18,10 @@
*/
package org.apache.iceberg;
-/** API for setting a table's base location. */
+/** API for setting a table's or view's base location. */
public interface UpdateLocation extends PendingUpdate<String> {
/**
- * Set the table's location.
+ * Set the table's or view's location.
*
* @param location a String location
* @return this for method chaining
diff --git a/api/src/main/java/org/apache/iceberg/view/View.java
b/api/src/main/java/org/apache/iceberg/view/View.java
index 284c561b78..9c33c545ec 100644
--- a/api/src/main/java/org/apache/iceberg/view/View.java
+++ b/api/src/main/java/org/apache/iceberg/view/View.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.view;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateLocation;
/** Interface for view definition. */
public interface View {
@@ -77,6 +78,15 @@ public interface View {
*/
Map<String, String> properties();
+ /**
+ * Return the view's base location.
+ *
+ * @return this view's location
+ */
+ default String location() {
+ throw new UnsupportedOperationException("Retrieving a view's location is
not supported");
+ }
+
/**
* Create a new {@link UpdateViewProperties} to update view properties.
*
@@ -92,4 +102,13 @@ public interface View {
default ReplaceViewVersion replaceVersion() {
throw new UnsupportedOperationException("Replacing a view's version is not
supported");
}
+
+ /**
+ * Create a new {@link UpdateLocation} to set the view's location.
+ *
+ * @return a new {@link UpdateLocation}
+ */
+ default UpdateLocation updateLocation() {
+ throw new UnsupportedOperationException("Updating a view's location is not
supported");
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/view/ViewBuilder.java
b/api/src/main/java/org/apache/iceberg/view/ViewBuilder.java
index 02620de722..0717e492fc 100644
--- a/api/src/main/java/org/apache/iceberg/view/ViewBuilder.java
+++ b/api/src/main/java/org/apache/iceberg/view/ViewBuilder.java
@@ -45,6 +45,16 @@ public interface ViewBuilder extends
VersionBuilder<ViewBuilder> {
*/
ViewBuilder withProperty(String key, String value);
+ /**
+ * Sets a location for the view
+ *
+ * @param location the location to set for the view
+ * @return this for method chaining
+ */
+ default ViewBuilder withLocation(String location) {
+ throw new UnsupportedOperationException("Setting a view's location is not
supported");
+ }
+
/**
* Create the view.
*
diff --git
a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
index 42eb80a047..1cede9b2e7 100644
--- a/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/view/BaseMetastoreViewCatalog.java
@@ -71,6 +71,7 @@ public abstract class BaseMetastoreViewCatalog extends
BaseMetastoreCatalog impl
private Namespace defaultNamespace = null;
private String defaultCatalog = null;
private Schema schema = null;
+ private String location = null;
protected BaseViewBuilder(TableIdentifier identifier) {
Preconditions.checkArgument(
@@ -115,6 +116,12 @@ public abstract class BaseMetastoreViewCatalog extends
BaseMetastoreCatalog impl
return this;
}
+ @Override
+ public ViewBuilder withLocation(String newLocation) {
+ this.location = newLocation;
+ return this;
+ }
+
@Override
public View create() {
return create(newViewOps(identifier));
@@ -160,7 +167,7 @@ public abstract class BaseMetastoreViewCatalog extends
BaseMetastoreCatalog impl
ViewMetadata viewMetadata =
ViewMetadata.builder()
.setProperties(properties)
- .setLocation(defaultWarehouseLocation(identifier))
+ .setLocation(null != location ? location :
defaultWarehouseLocation(identifier))
.setCurrentVersion(viewVersion, schema)
.build();
@@ -202,11 +209,16 @@ public abstract class BaseMetastoreViewCatalog extends
BaseMetastoreCatalog impl
.putSummary("operation", "replace")
.build();
- ViewMetadata replacement =
+ ViewMetadata.Builder builder =
ViewMetadata.buildFrom(metadata)
.setProperties(properties)
- .setCurrentVersion(viewVersion, schema)
- .build();
+ .setCurrentVersion(viewVersion, schema);
+
+ if (null != location) {
+ builder.setLocation(location);
+ }
+
+ ViewMetadata replacement = builder.build();
try {
ops.commit(metadata, replacement);
diff --git a/core/src/main/java/org/apache/iceberg/view/BaseView.java
b/core/src/main/java/org/apache/iceberg/view/BaseView.java
index a21bc2381f..a1b2863eef 100644
--- a/core/src/main/java/org/apache/iceberg/view/BaseView.java
+++ b/core/src/main/java/org/apache/iceberg/view/BaseView.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateLocation;
public class BaseView implements View, Serializable {
@@ -77,6 +78,11 @@ public class BaseView implements View, Serializable {
return operations().current().properties();
}
+ @Override
+ public String location() {
+ return operations().current().location();
+ }
+
@Override
public UpdateViewProperties updateProperties() {
return new PropertiesUpdate(ops);
@@ -86,4 +92,9 @@ public class BaseView implements View, Serializable {
public ReplaceViewVersion replaceVersion() {
return new ViewVersionReplace(ops);
}
+
+ @Override
+ public UpdateLocation updateLocation() {
+ return new SetViewLocation(ops);
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java
b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java
new file mode 100644
index 0000000000..481118c859
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.view;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+import org.apache.iceberg.UpdateLocation;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+
+class SetViewLocation implements UpdateLocation {
+ private final ViewOperations ops;
+ private String newLocation = null;
+
+ SetViewLocation(ViewOperations ops) {
+ this.ops = ops;
+ }
+
+ @Override
+ public String apply() {
+ Preconditions.checkState(null != newLocation, "Invalid view location:
null");
+ return newLocation;
+ }
+
+ @Override
+ public void commit() {
+ ViewMetadata base = ops.refresh();
+ Tasks.foreach(ops)
+ .retry(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_NUM_RETRIES,
COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MIN_RETRY_WAIT_MS,
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_MAX_RETRY_WAIT_MS,
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(
+ base.properties(), COMMIT_TOTAL_RETRY_TIME_MS,
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .onlyRetryOn(CommitFailedException.class)
+ .run(
+ taskOps ->
+ taskOps.commit(base,
ViewMetadata.buildFrom(base).setLocation(apply()).build()));
+ }
+
+ @Override
+ public UpdateLocation setLocation(String location) {
+ this.newLocation = location;
+ return this;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
index 1c95955383..3e19aaddee 100644
--- a/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/view/ViewCatalogTests.java
@@ -24,6 +24,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateLocation;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -131,6 +132,7 @@ public abstract class ViewCatalogTests<C extends
ViewCatalog & SupportsNamespace
.withQuery("trino", "select * from ns.tbl using X")
.withProperty("prop1", "val1")
.withProperty("prop2", "val2")
+ .withLocation("file://tmp/ns/view")
.create();
assertThat(view).isNotNull();
@@ -138,6 +140,7 @@ public abstract class ViewCatalogTests<C extends
ViewCatalog & SupportsNamespace
// validate view settings
assertThat(view.name()).isEqualTo(ViewUtil.fullViewName(catalog().name(),
identifier));
+ assertThat(view.location()).isEqualTo("file://tmp/ns/view");
assertThat(view.properties()).containsEntry("prop1",
"val1").containsEntry("prop2", "val2");
assertThat(view.history())
.hasSize(1)
@@ -1337,4 +1340,113 @@ public abstract class ViewCatalogTests<C extends
ViewCatalog & SupportsNamespace
.isInstanceOf(NoSuchViewException.class)
.hasMessageStartingWith("View does not exist: ns.view");
}
+
+ @Test
+ public void createAndReplaceViewWithLocation() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .withLocation("file://tmp/ns/view")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ assertThat(view.location()).isEqualTo("file://tmp/ns/view");
+
+ view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .withLocation("file://updated_tmp/ns/view")
+ .replace();
+
+ assertThat(view.location()).isEqualTo("file://updated_tmp/ns/view");
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void updateViewLocation() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .withLocation("file://tmp/ns/view")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+ assertThat(view.location()).isEqualTo("file://tmp/ns/view");
+
+ view.updateLocation().setLocation("file://updated_tmp/ns/view").commit();
+
+ View updatedView = catalog().loadView(identifier);
+
+ assertThat(updatedView.location()).isEqualTo("file://updated_tmp/ns/view");
+
+ // history and view versions should stay the same after updating view
properties
+ assertThat(updatedView.history()).hasSize(1).isEqualTo(view.history());
+
assertThat(updatedView.versions()).hasSize(1).containsExactly(view.currentVersion());
+
+ assertThat(catalog().dropView(identifier)).isTrue();
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+ }
+
+ @Test
+ public void updateViewLocationConflict() {
+ TableIdentifier identifier = TableIdentifier.of("ns", "view");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(identifier.namespace());
+ }
+
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ View view =
+ catalog()
+ .buildView(identifier)
+ .withSchema(SCHEMA)
+ .withDefaultNamespace(identifier.namespace())
+ .withQuery("trino", "select * from ns.tbl")
+ .create();
+
+ assertThat(catalog().viewExists(identifier)).as("View should
exist").isTrue();
+
+ // new location must be non-null
+ assertThatThrownBy(() -> view.updateLocation().setLocation(null).commit())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Invalid view location: null");
+
+ UpdateLocation updateViewLocation = view.updateLocation();
+
+ catalog().dropView(identifier);
+ assertThat(catalog().viewExists(identifier)).as("View should not
exist").isFalse();
+
+ // the view was already dropped concurrently
+ assertThatThrownBy(() ->
updateViewLocation.setLocation("new-location").commit())
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Cannot commit");
+ }
}