This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b87d5f7750a [SPARK-40551][SQL] DataSource V2: Add APIs for delta-based 
row-level operations
b87d5f7750a is described below

commit b87d5f7750a533acb45b2b75474cdde5dc7d92a0
Author: Anton Okolnychyi <aokolnyc...@apple.com>
AuthorDate: Thu Oct 13 11:50:20 2022 -0700

    [SPARK-40551][SQL] DataSource V2: Add APIs for delta-based row-level 
operations
    
    ### What changes were proposed in this pull request?
    
    This PR adds DS v2 APIs for handling row-level operations for data sources 
that support deltas of rows.
    
    ### Why are the changes needed?
    
    These changes are part of the approved SPIP in SPARK-35801.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR adds new DS v2 APIs per [design 
doc](https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60).
    
    ### How was this patch tested?
    
    Tests will be part of the implementation PR.
    
    Closes #38004 from aokolnychyi/spark-40551.
    
    Lead-authored-by: Anton Okolnychyi <aokolnyc...@apple.com>
    Co-authored-by: aokolnychyi <aokolnyc...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/connector/write/DeltaBatchWrite.java}      | 19 ++++---
 .../spark/sql/connector/write/DeltaWrite.java}     | 21 +++++---
 .../sql/connector/write/DeltaWriteBuilder.java}    | 21 +++++---
 .../spark/sql/connector/write/DeltaWriter.java     | 63 ++++++++++++++++++++++
 .../sql/connector/write/DeltaWriterFactory.java}   | 22 +++++---
 .../sql/connector/write/LogicalWriteInfo.java      | 18 +++++++
 .../spark/sql/connector/write/SupportsDelta.java}  | 26 ++++++---
 .../sql/connector/write/LogicalWriteInfoImpl.scala |  7 ++-
 8 files changed, 161 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java
similarity index 69%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java
index b1492e42981..86c48b85dcd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaBatchWrite.java
@@ -15,12 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.write
+package org.apache.spark.sql.connector.write;
 
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.annotation.Experimental;
 
-private[sql] case class LogicalWriteInfoImpl(
-    queryId: String,
-    schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+/**
+ * An interface that defines how to write a delta of rows during batch 
processing.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface DeltaBatchWrite extends BatchWrite {
+  @Override
+  DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info);
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java
similarity index 65%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java
index b1492e42981..eb230598ef4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWrite.java
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.write
+package org.apache.spark.sql.connector.write;
 
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.annotation.Experimental;
 
-private[sql] case class LogicalWriteInfoImpl(
-    queryId: String,
-    schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+/**
+ * A logical representation of a data source write that handles a delta of 
rows.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface DeltaWrite extends Write {
+  @Override
+  default DeltaBatchWrite toBatch() {
+    throw new UnsupportedOperationException(description() + ": Delta batch 
write is not supported");
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java
similarity index 67%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java
index b1492e42981..dde3214170f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriteBuilder.java
@@ -15,12 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.write
+package org.apache.spark.sql.connector.write;
 
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.annotation.Experimental;
 
-private[sql] case class LogicalWriteInfoImpl(
-    queryId: String,
-    schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+/**
+ * An interface for building a {@link DeltaWrite}.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface DeltaWriteBuilder extends WriteBuilder {
+  @Override
+  default DeltaWrite build() {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
implement build");
+  }
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java
new file mode 100644
index 00000000000..0cc6cb48801
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import java.io.IOException;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * A data writer returned by {@link DeltaWriterFactory#createWriter(int, 
long)} and is
+ * responsible for writing a delta of rows.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface DeltaWriter<T> extends DataWriter<T> {
+  /**
+   * Deletes a row.
+   *
+   * @param metadata values for metadata columns that were projected but are 
not part of the row ID
+   * @param id a row ID to delete
+   * @throws IOException if failure happens during disk/network IO like 
writing files
+   */
+  void delete(T metadata, T id) throws IOException;
+
+  /**
+   * Updates a row.
+   *
+   * @param metadata values for metadata columns that were projected but are 
not part of the row ID
+   * @param id a row ID to update
+   * @param row a row with updated values
+   * @throws IOException if failure happens during disk/network IO like 
writing files
+   */
+  void update(T metadata, T id, T row) throws IOException;
+
+  /**
+   * Inserts a new row.
+   *
+   * @param row a row to insert
+   * @throws IOException if failure happens during disk/network IO like 
writing files
+   */
+  void insert(T row) throws IOException;
+
+  @Override
+  default void write(T row) throws IOException {
+    insert(row);
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java
similarity index 59%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java
index b1492e42981..0f9c1f91833 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeltaWriterFactory.java
@@ -15,12 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.write
+package org.apache.spark.sql.connector.write;
 
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.catalyst.InternalRow;
 
-private[sql] case class LogicalWriteInfoImpl(
-    queryId: String,
-    schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+/**
+ * A factory for creating {@link DeltaWriter}s returned by
+ * {@link DeltaBatchWrite#createBatchWriterFactory(PhysicalWriteInfo)}, which 
is responsible for
+ * creating and initializing writers at the executor side.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface DeltaWriterFactory extends DataWriterFactory {
+  @Override
+  DeltaWriter<InternalRow> createWriter(int partitionId, long taskId);
+}
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
index e472a130187..bdf1bb3b9c0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/LogicalWriteInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.connector.write;
 
+import java.util.Optional;
+
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -45,4 +47,20 @@ public interface LogicalWriteInfo {
    * the schema of the input data from Spark to data source.
    */
   StructType schema();
+
+  /**
+   * the schema of the ID columns from Spark to data source.
+   */
+  default Optional<StructType> rowIdSchema() {
+    throw new UnsupportedOperationException(
+        getClass().getName() + " does not implement rowIdSchema");
+  }
+
+  /**
+   * the schema of the input metadata from Spark to data source.
+   */
+  default Optional<StructType> metadataSchema() {
+    throw new UnsupportedOperationException(
+        getClass().getName() + " does not implement metadataSchema");
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java
similarity index 57%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
copy to 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java
index b1492e42981..6315b65f610 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsDelta.java
@@ -15,12 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector.write
+package org.apache.spark.sql.connector.write;
 
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 
-private[sql] case class LogicalWriteInfoImpl(
-    queryId: String,
-    schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+/**
+ * A mix-in interface for {@link RowLevelOperation}. Data sources can 
implement this interface
+ * to indicate they support handling deltas of rows.
+ *
+ * @since 3.4.0
+ */
+@Experimental
+public interface SupportsDelta extends RowLevelOperation {
+  @Override
+  DeltaWriteBuilder newWriteBuilder(LogicalWriteInfo info);
+
+  /**
+   * Returns the row ID column references that should be used for row equality.
+   */
+  NamedReference[] rowId();
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
index b1492e42981..8c0828d8a27 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/LogicalWriteInfoImpl.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql.connector.write
 
+import java.util.Optional
+
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 private[sql] case class LogicalWriteInfoImpl(
     queryId: String,
     schema: StructType,
-    options: CaseInsensitiveStringMap) extends LogicalWriteInfo
+    options: CaseInsensitiveStringMap,
+    override val rowIdSchema: Optional[StructType] = 
Optional.empty[StructType],
+    override val metadataSchema: Optional[StructType] = 
Optional.empty[StructType])
+  extends LogicalWriteInfo


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to