This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new abf8770ffac7 [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for
supporting python data source streaming write
abf8770ffac7 is described below
commit abf8770ffac7ac5f4dcd5b7b94b744b0267b34d9
Author: Chaoqin Li <[email protected]>
AuthorDate: Thu Feb 8 12:16:49 2024 +0900
[SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python
data source streaming write
### What changes were proposed in this pull request?
Move PythonBatchWrite out of PythonWrite.
### Why are the changes needed?
This is to prepare for supporting python data source streaming write in the
future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Trivial code refactoring, existing test sufficient.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45049 from chaoqin-li1123/python_sink.
Authored-by: Chaoqin Li <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../datasources/v2/python/PythonWrite.scala | 34 +++++++++++++---------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
index d216dfde9974..a10a18e43f64 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonWrite.scala
@@ -26,15 +26,32 @@ class PythonWrite(
shortName: String,
info: LogicalWriteInfo,
isTruncate: Boolean
- ) extends Write with BatchWrite {
- private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+ ) extends Write {
+
+ override def toString: String = shortName
+
+ override def toBatch: BatchWrite = new PythonBatchWrite(ds, shortName, info,
isTruncate)
+
+ override def description: String = "(Python)"
+
+ override def supportedCustomMetrics(): Array[CustomMetric] =
+ ds.source.createPythonMetrics()
+}
+
+class PythonBatchWrite(
+ ds: PythonDataSourceV2,
+ shortName: String,
+ info: LogicalWriteInfo,
+ isTruncate: Boolean
+ ) extends BatchWrite {
// Store the pickled data source writer instance.
private var pythonDataSourceWriter: Array[Byte] = _
- override def createBatchWriterFactory(
- physicalInfo: PhysicalWriteInfo): DataWriterFactory = {
+ private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+ override def createBatchWriterFactory(physicalInfo: PhysicalWriteInfo):
DataWriterFactory =
+ {
val writeInfo = ds.source.createWriteInfoInPython(
shortName,
info.schema(),
@@ -53,13 +70,4 @@ class PythonWrite(
override def abort(messages: Array[WriterCommitMessage]): Unit = {
ds.source.commitWriteInPython(pythonDataSourceWriter, messages, abort =
true)
}
-
- override def toString: String = shortName
-
- override def toBatch: BatchWrite = this
-
- override def description: String = "(Python)"
-
- override def supportedCustomMetrics(): Array[CustomMetric] =
- ds.source.createPythonMetrics()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]