This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bbf70a52e Spark: Bypass Spark's ViewCatalog API when replacing a view 
(#9596)
6bbf70a52e is described below

commit 6bbf70a52ebccfaba4e7e08facd72b84b571e2a6
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Feb 1 18:55:57 2024 +0100

    Spark: Bypass Spark's ViewCatalog API when replacing a view (#9596)
    
    Spark's `ViewCatalog` API doesn't have a `replace()` in 3.5 as it was
    only introduced later. Therefore we're bypassing Spark's `ViewCatalog`
    so that we can keep the view's history after executing a `CREATE OR
    REPLACE`
---
 .../datasources/v2/CreateV2ViewExec.scala          | 41 +++++++++++-----
 .../apache/iceberg/spark/extensions/TestViews.java | 39 +++++++++++++++
 .../org/apache/iceberg/spark/SparkCatalog.java     | 55 ++++++++++++++++++---
 .../apache/iceberg/spark/SupportsReplaceView.java  | 56 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/source/SparkView.java |  2 +-
 5 files changed, 172 insertions(+), 21 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
index 892e1eb857..e439415e9c 100644
--- 
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
+++ 
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.iceberg.spark.SupportsReplaceView
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -57,20 +58,34 @@ case class CreateV2ViewExec(
 
     if (replace) {
       // CREATE OR REPLACE VIEW
-      if (catalog.viewExists(ident)) {
-        catalog.dropView(ident)
+      catalog match {
+        case c: SupportsReplaceView =>
+          c.replaceView(
+            ident,
+            queryText,
+            currentCatalog,
+            currentNamespace,
+            viewSchema,
+            queryColumnNames.toArray,
+            columnAliases.toArray,
+            columnComments.map(c => c.orNull).toArray,
+            newProperties.asJava)
+        case _ =>
+          if (catalog.viewExists(ident)) {
+            catalog.dropView(ident)
+          }
+
+          catalog.createView(
+            ident,
+            queryText,
+            currentCatalog,
+            currentNamespace,
+            viewSchema,
+            queryColumnNames.toArray,
+            columnAliases.toArray,
+            columnComments.map(c => c.orNull).toArray,
+            newProperties.asJava)
       }
-      // FIXME: replaceView API doesn't exist in Spark 3.5
-      catalog.createView(
-        ident,
-        queryText,
-        currentCatalog,
-        currentNamespace,
-        viewSchema,
-        queryColumnNames.toArray,
-        columnAliases.toArray,
-        columnComments.map(c => c.orNull).toArray,
-        newProperties.asJava)
     } else {
       try {
         // CREATE VIEW [IF NOT EXISTS]
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
index 1c4412ee11..5ca4a1e7a1 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
@@ -1549,6 +1549,45 @@ public class TestViews extends SparkExtensionsTestBase {
             "ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE 
VIEW instead");
   }
 
+  @Test
+  public void createOrReplaceViewKeepsViewHistory() {
+    String viewName = viewName("viewWithHistoryAfterReplace");
+    String sql = String.format("SELECT id, data FROM %s WHERE id <= 3", 
tableName);
+    String updatedSql = String.format("SELECT id FROM %s WHERE id > 3", 
tableName);
+
+    sql(
+        "CREATE VIEW %s (new_id COMMENT 'some ID', new_data COMMENT 'some 
data') AS %s",
+        viewName, sql);
+
+    View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, 
viewName));
+    assertThat(view.history()).hasSize(1);
+    assertThat(view.sqlFor("spark").sql()).isEqualTo(sql);
+    assertThat(view.currentVersion().versionId()).isEqualTo(1);
+    assertThat(view.currentVersion().schemaId()).isEqualTo(0);
+    assertThat(view.schemas()).hasSize(1);
+    assertThat(view.schema().asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(0, "new_id", 
Types.IntegerType.get(), "some ID"),
+                    Types.NestedField.optional(1, "new_data", 
Types.StringType.get(), "some data"))
+                .asStruct());
+
+    sql("CREATE OR REPLACE VIEW %s (updated_id COMMENT 'updated ID') AS %s", 
viewName, updatedSql);
+
+    view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
+    assertThat(view.history()).hasSize(2);
+    assertThat(view.sqlFor("spark").sql()).isEqualTo(updatedSql);
+    assertThat(view.currentVersion().versionId()).isEqualTo(2);
+    assertThat(view.currentVersion().schemaId()).isEqualTo(1);
+    assertThat(view.schemas()).hasSize(2);
+    assertThat(view.schema().asStruct())
+        .isEqualTo(
+            new Schema(
+                    Types.NestedField.optional(
+                        0, "updated_id", Types.IntegerType.get(), "updated 
ID"))
+                .asStruct());
+  }
+
   private void insertRows(int numRows) throws NoSuchTableException {
     List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
     for (int i = 1; i <= numRows; i++) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index a16b3ea572..4b2a45fabc 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
@@ -52,6 +51,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -121,9 +121,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
  * <p>
  */
 public class SparkCatalog extends BaseCatalog
-    implements org.apache.spark.sql.connector.catalog.ViewCatalog {
+    implements org.apache.spark.sql.connector.catalog.ViewCatalog, 
SupportsReplaceView {
   private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
   private static final Splitter COMMA = Splitter.on(",");
+  private static final Joiner COMMA_JOINER = Joiner.on(",");
   private static final Pattern AT_TIMESTAMP = 
Pattern.compile("at_timestamp_(\\d+)");
   private static final Pattern SNAPSHOT_ID = 
Pattern.compile("snapshot_id_(\\d+)");
   private static final Pattern BRANCH = Pattern.compile("branch_(.*)");
@@ -578,15 +579,13 @@ public class SparkCatalog extends BaseCatalog
     if (null != asViewCatalog) {
       Schema icebergSchema = SparkSchemaUtil.convert(schema);
 
-      StringJoiner joiner = new StringJoiner(",");
-      Arrays.stream(queryColumnNames).forEach(joiner::add);
-
       try {
         Map<String, String> props =
             ImmutableMap.<String, String>builder()
                 .putAll(Spark3Util.rebuildCreateProperties(properties))
-                .put("spark.query-column-names", joiner.toString())
-                .build();
+                .put(SparkView.QUERY_COLUMN_NAMES, 
COMMA_JOINER.join(queryColumnNames))
+                .buildKeepingLast();
+
         org.apache.iceberg.view.View view =
             asViewCatalog
                 .buildView(buildIdentifier(ident))
@@ -609,6 +608,48 @@ public class SparkCatalog extends BaseCatalog
         "Creating a view is not supported by catalog: " + catalogName);
   }
 
+  @Override
+  public View replaceView(
+      Identifier ident,
+      String sql,
+      String currentCatalog,
+      String[] currentNamespace,
+      StructType schema,
+      String[] queryColumnNames,
+      String[] columnAliases,
+      String[] columnComments,
+      Map<String, String> properties)
+      throws NoSuchNamespaceException {
+    if (null != asViewCatalog) {
+      Schema icebergSchema = SparkSchemaUtil.convert(schema);
+
+      try {
+        Map<String, String> props =
+            ImmutableMap.<String, String>builder()
+                .putAll(Spark3Util.rebuildCreateProperties(properties))
+                .put(SparkView.QUERY_COLUMN_NAMES, 
COMMA_JOINER.join(queryColumnNames))
+                .buildKeepingLast();
+
+        org.apache.iceberg.view.View view =
+            asViewCatalog
+                .buildView(buildIdentifier(ident))
+                .withDefaultCatalog(currentCatalog)
+                .withDefaultNamespace(Namespace.of(currentNamespace))
+                .withQuery("spark", sql)
+                .withSchema(icebergSchema)
+                .withLocation(properties.get("location"))
+                .withProperties(props)
+                .replace();
+        return new SparkView(catalogName, view);
+      } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
+        throw new NoSuchNamespaceException(currentNamespace);
+      }
+    }
+
+    throw new UnsupportedOperationException(
+        "Replacing a view is not supported by catalog: " + catalogName);
+  }
+
   @Override
   public View alterView(Identifier ident, ViewChange... changes)
       throws NoSuchViewException, IllegalArgumentException {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
new file mode 100644
index 0000000000..8bdb7b1386
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplaceView.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewCatalog;
+import org.apache.spark.sql.types.StructType;
+
+public interface SupportsReplaceView extends ViewCatalog {
+  /**
+   * Replace a view in the catalog
+   *
+   * @param ident a view identifier
+   * @param sql the SQL text that defines the view
+   * @param currentCatalog the current catalog
+   * @param currentNamespace the current namespace
+   * @param schema the view query output schema
+   * @param queryColumnNames the query column names
+   * @param columnAliases the column aliases
+   * @param columnComments the column comments
+   * @param properties the view properties
+   * @throws NoSuchViewException If the view doesn't exist or is a table
+   * @throws NoSuchNamespaceException If the identifier namespace does not 
exist (optional)
+   */
+  View replaceView(
+      Identifier ident,
+      String sql,
+      String currentCatalog,
+      String[] currentNamespace,
+      StructType schema,
+      String[] queryColumnNames,
+      String[] columnAliases,
+      String[] columnComments,
+      Map<String, String> properties)
+      throws NoSuchViewException, NoSuchNamespaceException;
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
index d5a3dfc972..47e5729536 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.StructType;
 
 public class SparkView implements org.apache.spark.sql.connector.catalog.View {
 
-  private static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
+  public static final String QUERY_COLUMN_NAMES = "spark.query-column-names";
   public static final Set<String> RESERVED_PROPERTIES =
       ImmutableSet.of("provider", "location", FORMAT_VERSION, 
QUERY_COLUMN_NAMES);
 

Reply via email to