This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6bbf70a52e Spark: Bypass Spark's ViewCatalog API when replacing a view
(#9596)
6bbf70a52e is described below
commit 6bbf70a52ebccfaba4e7e08facd72b84b571e2a6
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Feb 1 18:55:57 2024 +0100
Spark: Bypass Spark's ViewCatalog API when replacing a view (#9596)
Spark's `ViewCatalog` API doesn't have a `replace()` in 3.5 as it was
only introduced later. Therefore we're bypassing Spark's `ViewCatalog`
so that we can keep the view's history after executing a `CREATE OR
REPLACE`
---
.../datasources/v2/CreateV2ViewExec.scala | 41 +++++++++++-----
.../apache/iceberg/spark/extensions/TestViews.java | 39 +++++++++++++++
.../org/apache/iceberg/spark/SparkCatalog.java | 55 ++++++++++++++++++---
.../apache/iceberg/spark/SupportsReplaceView.java | 56 ++++++++++++++++++++++
.../org/apache/iceberg/spark/source/SparkView.java | 2 +-
5 files changed, 172 insertions(+), 21 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
index 892e1eb857..e439415e9c 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2
+import org.apache.iceberg.spark.SupportsReplaceView
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -57,20 +58,34 @@ case class CreateV2ViewExec(
if (replace) {
// CREATE OR REPLACE VIEW
- if (catalog.viewExists(ident)) {
- catalog.dropView(ident)
+ catalog match {
+ case c: SupportsReplaceView =>
+ c.replaceView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
+ case _ =>
+ if (catalog.viewExists(ident)) {
+ catalog.dropView(ident)
+ }
+
+ catalog.createView(
+ ident,
+ queryText,
+ currentCatalog,
+ currentNamespace,
+ viewSchema,
+ queryColumnNames.toArray,
+ columnAliases.toArray,
+ columnComments.map(c => c.orNull).toArray,
+ newProperties.asJava)
}
- // FIXME: replaceView API doesn't exist in Spark 3.5
- catalog.createView(
- ident,
- queryText,
- currentCatalog,
- currentNamespace,
- viewSchema,
- queryColumnNames.toArray,
- columnAliases.toArray,
- columnComments.map(c => c.orNull).toArray,
- newProperties.asJava)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 1c4412ee11..5ca4a1e7a1 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -1549,6 +1549,45 @@ public class TestViews extends SparkExtensionsTestBase {
"ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE
VIEW instead");
}
+ @Test
+ public void createOrReplaceViewKeepsViewHistory() {
+ String viewName = viewName("viewWithHistoryAfterReplace");
+ String sql = String.format("SELECT id, data FROM %s WHERE id <= 3",
tableName);
+ String updatedSql = String.format("SELECT id FROM %s WHERE id > 3",
tableName);
+
+ sql(
+ "CREATE VIEW %s (new_id COMMENT 'some ID', new_data COMMENT 'some
data') AS %s",
+ viewName, sql);
+
+ View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE,
viewName));
+ assertThat(view.history()).hasSize(1);
+ assertThat(view.sqlFor("spark").sql()).isEqualTo(sql);
+ assertThat(view.currentVersion().versionId()).isEqualTo(1);
+ assertThat(view.currentVersion().schemaId()).isEqualTo(0);
+ assertThat(view.schemas()).hasSize(1);
+ assertThat(view.schema().asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(0, "new_id",
Types.IntegerType.get(), "some ID"),
+ Types.NestedField.optional(1, "new_data",
Types.StringType.get(), "some data"))
+ .asStruct());
+
+ sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s",
viewName, updatedSql);
+
+ view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
+ assertThat(view.history()).hasSize(2);
+ assertThat(view.sqlFor("spark").sql()).isEqualTo(updatedSql);
+ assertThat(view.currentVersion().versionId()).isEqualTo(2);
+ assertThat(view.currentVersion().schemaId()).isEqualTo(1);
+ assertThat(view.schemas()).hasSize(2);
+ assertThat(view.schema().asStruct())
+ .isEqualTo(
+ new Schema(
+ Types.NestedField.optional(
+ 0, "updated_id", Types.IntegerType.get(), "updated
ID"))
+ .asStruct());
+ }
+
private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index a16b3ea572..4b2a45fabc 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
@@ -52,6 +51,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -121,9 +121,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
* <p>
*/
public class SparkCatalog extends BaseCatalog
- implements org.apache.spark.sql.connector.catalog.ViewCatalog {
+ implements org.apache.spark.sql.connector.catalog.ViewCatalog,
SupportsReplaceView {
private static final Set<String> DEFAULT_NS_KEYS =
ImmutableSet.of(TableCatalog.PROP_OWNER);
private static final Splitter COMMA = Splitter.on(",");
+ private static final Joiner COMMA_JOINER = Joiner.on(",");
private static final Pattern AT_TIMESTAMP =
Pattern.compile("at_timestamp_(\\d+)");
private static final Pattern SNAPSHOT_ID =
Pattern.compile("snapshot_id_(\\d+)");
private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
@@ -578,15 +579,13 @@ public class SparkCatalog extends BaseCatalog
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);
- StringJoiner joiner = new StringJoiner(",");
- Arrays.stream(queryColumnNames).forEach(joiner::add);
-
try {
Map<String, String> props =
ImmutableMap.<String, String>builder()
.putAll(Spark3Util.rebuildCreateProperties(properties))
- .put("spark.query-column-names", joiner.toString())
- .build();
+ .put(SparkView.QUERY_COLUMN_NAMES,
COMMA_JOINER.join(queryColumnNames))
+ .buildKeepingLast();
+
org.apache.iceberg.view.View view =
asViewCatalog
.buildView(buildIdentifier(ident))
@@ -609,6 +608,48 @@ public class SparkCatalog extends BaseCatalog
"Creating a view is not supported by catalog: " + catalogName);
}
+ @Override
+ public View replaceView(
+ Identifier ident,
+ String sql,
+ String currentCatalog,
+ String[] currentNamespace,
+ StructType schema,
+ String[] queryColumnNames,
+ String[] columnAliases,
+ String[] columnComments,
+ Map<String, String> properties)
+ throws NoSuchNamespaceException {
+ if (null != asViewCatalog) {
+ Schema icebergSchema = SparkSchemaUtil.convert(schema);
+
+ try {
+ Map<String, String> props =
+ ImmutableMap.<String, String>builder()
+ .putAll(Spark3Util.rebuildCreateProperties(properties))
+ .put(SparkView.QUERY_COLUMN_NAMES,
COMMA_JOINER.join(queryColumnNames))
+ .buildKeepingLast();
+
+ org.apache.iceberg.view.View view =
+ asViewCatalog
+ .buildView(buildIdentifier(ident))
+ .withDefaultCatalog(currentCatalog)
+ .withDefaultNamespace(Namespace.of(currentNamespace))
+ .withQuery("spark", sql)
+ .withSchema(icebergSchema)
+ .withLocation(properties.get("location"))
+ .withProperties(props)
+ .replace();
+ return new SparkView(catalogName, view);
+ } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+ throw new NoSuchNamespaceException(currentNamespace);
+ }
+ }
+
+ throw new UnsupportedOperationException(
+ "Replacing a view is not supported by catalog: " + catalogName);
+ }
+
@Override
public View alterView(Identifier ident, ViewChange... changes)
throws NoSuchViewException, IllegalArgumentException {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
new file mode 100644
index 0000000000..8bdb7b1386
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewCatalog;
+import org.apache.spark.sql.types.StructType;
+
+public interface SupportsReplaceView extends ViewCatalog {
+ /**
+ * Replace a view in the catalog
+ *
+ * @param ident a view identifier
+ * @param sql the SQL text that defines the view
+ * @param currentCatalog the current catalog
+ * @param currentNamespace the current namespace
+ * @param schema the view query output schema
+ * @param queryColumnNames the query column names
+ * @param columnAliases the column aliases
+ * @param columnComments the column comments
+ * @param properties the view properties
+ * @throws NoSuchViewException If the view doesn't exist or is a table
+ * @throws NoSuchNamespaceException If the identifier namespace does not
exist (optional)
+ */
+ View replaceView(
+ Identifier ident,
+ String sql,
+ String currentCatalog,
+ String[] currentNamespace,
+ StructType schema,
+ String[] queryColumnNames,
+ String[] columnAliases,
+ String[] columnComments,
+ Map<String, String> properties)
+ throws NoSuchViewException, NoSuchNamespaceException;
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
index d5a3dfc972..47e5729536 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.StructType;
public class SparkView implements org.apache.spark.sql.connector.catalog.View {
- private static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
+ public static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
public static final Set<String> RESERVED_PROPERTIES =
ImmutableSet.of("provider", "location", FORMAT_VERSION,
QUERY_COLUMN_NAMES);