Repository: spark
Updated Branches:
  refs/heads/master 65338de5f -> 44c400315


[SPARK-22400][SQL] rename some APIs and classes to make their meaning clearer

## What changes were proposed in this pull request?

Both `ReadSupport` and `ReadTask` have a method called `createReader`, but they 
create different things. This could cause some confusion for data source 
developers. The same issue exists between `WriteSupport` and 
`DataWriterFactory`, both of which have a method called `createWriter`. This PR 
renames the method of `ReadTask`/`DataWriterFactory` to 
`createDataReader`/`createDataWriter`.

Besides, the name of `RowToInternalRowDataWriterFactory` is not correct, 
because it actually converts `InternalRow`s to `Row`s. It should be renamed 
`InternalRowDataWriterFactory`.

## How was this patch tested?

Only renaming, should be covered by existing tests.

Author: Zhenhua Wang <[email protected]>

Closes #19610 from wzhfy/rename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44c40031
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44c40031
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44c40031

Branch: refs/heads/master
Commit: 44c4003155c1d243ffe0f73d5537b4c8b3f3b564
Parents: 65338de
Author: Zhenhua Wang <[email protected]>
Authored: Mon Oct 30 10:21:05 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Mon Oct 30 10:21:05 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/sources/v2/reader/DataReader.java       |  4 ++--
 .../apache/spark/sql/sources/v2/reader/ReadTask.java  |  2 +-
 .../spark/sql/sources/v2/writer/DataWriter.java       |  6 +++---
 .../sql/sources/v2/writer/DataWriterFactory.java      |  2 +-
 .../sql/execution/datasources/v2/DataSourceRDD.scala  |  2 +-
 .../datasources/v2/DataSourceV2ScanExec.scala         |  5 +++--
 .../datasources/v2/WriteToDataSourceV2.scala          | 14 +++++++-------
 .../sql/sources/v2/JavaAdvancedDataSourceV2.java      |  2 +-
 .../spark/sql/sources/v2/JavaSimpleDataSourceV2.java  |  2 +-
 .../sql/sources/v2/JavaUnsafeRowDataSourceV2.java     |  2 +-
 .../spark/sql/sources/v2/DataSourceV2Suite.scala      |  8 +++++---
 .../sql/sources/v2/SimpleWritableDataSource.scala     |  9 ++++-----
 12 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
index 95e0915..52bb138 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java
@@ -22,8 +22,8 @@ import java.io.Closeable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data reader returned by {@link ReadTask#createReader()} and is 
responsible for outputting data
- * for a RDD partition.
+ * A data reader returned by {@link ReadTask#createDataReader()} and is 
responsible for
+ * outputting data for a RDD partition.
  *
  * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.Row} for normal data
  * source readers, or {@link 
org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
index 01362df..44786db 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ReadTask.java
@@ -45,5 +45,5 @@ public interface ReadTask<T> extends Serializable {
   /**
    * Returns a data reader to do the actual reading work for this read task.
    */
-  DataReader<T> createReader();
+  DataReader<T> createDataReader();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 1426141..d84afba 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.writer;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data writer returned by {@link DataWriterFactory#createWriter(int, int)} 
and is
+ * A data writer returned by {@link DataWriterFactory#createDataWriter(int, 
int)} and is
  * responsible for writing data for an input RDD partition.
  *
  * One Spark task has one exclusive data writer, so there is no thread-safe 
concern.
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.InterfaceStability;
  * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit 
messages from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark will retry this 
writing task for some times,
- * each time {@link DataWriterFactory#createWriter(int, int)} gets a different 
`attemptNumber`,
+ * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a 
different `attemptNumber`,
  * and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if 
all retry fail.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the 
existing writing task
@@ -64,7 +64,7 @@ public interface DataWriter<T> {
 
   /**
    * Commits this writer after all records are written successfully, returns a 
commit message which
-   * will be send back to driver side and pass to
+   * will be sent back to driver side and passed to
    * {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
    *
    * The written data should only be visible to data source readers after

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index f812d10..fe56cc0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -46,5 +46,5 @@ public interface DataWriterFactory<T> extends Serializable {
    *                      tasks with the same task id running at the same 
time. Implementations can
    *                      use this attempt number to distinguish writers of 
different task attempts.
    */
-  DataWriter<T> createWriter(int partitionId, int attemptNumber);
+  DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index b8fe5ac..5f30be5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -39,7 +39,7 @@ class DataSourceRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
-    val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createReader()
+    val reader = 
split.asInstanceOf[DataSourceRDDPartition].readTask.createDataReader()
     context.addTaskCompletionListener(_ => reader.close())
     val iter = new Iterator[UnsafeRow] {
       private[this] var valuePrepared = false

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index addc12a..3f243dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -67,8 +67,9 @@ class RowToUnsafeRowReadTask(rowReadTask: ReadTask[Row], 
schema: StructType)
 
   override def preferredLocations: Array[String] = 
rowReadTask.preferredLocations
 
-  override def createReader: DataReader[UnsafeRow] = {
-    new RowToUnsafeDataReader(rowReadTask.createReader, 
RowEncoder.apply(schema).resolveAndBind())
+  override def createDataReader: DataReader[UnsafeRow] = {
+    new RowToUnsafeDataReader(
+      rowReadTask.createDataReader, RowEncoder.apply(schema).resolveAndBind())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 92c1e1f..b72d15e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -48,7 +48,7 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceV2Writer, query: SparkPlan)
   override protected def doExecute(): RDD[InternalRow] = {
     val writeTask = writer match {
       case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
-      case _ => new 
RowToInternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), 
query.schema)
     }
 
     val rdd = query.execute()
@@ -93,7 +93,7 @@ object DataWritingSparkTask extends Logging {
       writeTask: DataWriterFactory[InternalRow],
       context: TaskContext,
       iter: Iterator[InternalRow]): WriterCommitMessage = {
-    val dataWriter = writeTask.createWriter(context.partitionId(), 
context.attemptNumber())
+    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
 
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
@@ -111,18 +111,18 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
-class RowToInternalRowDataWriterFactory(
+class InternalRowDataWriterFactory(
     rowWriterFactory: DataWriterFactory[Row],
     schema: StructType) extends DataWriterFactory[InternalRow] {
 
-  override def createWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
-    new RowToInternalRowDataWriter(
-      rowWriterFactory.createWriter(partitionId, attemptNumber),
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+    new InternalRowDataWriter(
+      rowWriterFactory.createDataWriter(partitionId, attemptNumber),
       RowEncoder.apply(schema).resolveAndBind())
   }
 }
 
-class RowToInternalRowDataWriter(rowWriter: DataWriter[Row], encoder: 
ExpressionEncoder[Row])
+class InternalRowDataWriter(rowWriter: DataWriter[Row], encoder: 
ExpressionEncoder[Row])
   extends DataWriter[InternalRow] {
 
   override def write(record: InternalRow): Unit = 
rowWriter.write(encoder.fromRow(record))

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index da2c13f..1cfdc08 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -100,7 +100,7 @@ public class JavaAdvancedDataSourceV2 implements 
DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<Row> createReader() {
+    public DataReader<Row> createDataReader() {
       return new JavaAdvancedReadTask(start - 1, end, requiredSchema);
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 08469f1..2d458b7 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -58,7 +58,7 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, 
ReadSupport {
     }
 
     @Override
-    public DataReader<Row> createReader() {
+    public DataReader<Row> createDataReader() {
       return new JavaSimpleReadTask(start - 1, end);
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index 9efe7c7..f6aa008 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -58,7 +58,7 @@ public class JavaUnsafeRowDataSourceV2 implements 
DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<UnsafeRow> createReader() {
+    public DataReader<UnsafeRow> createDataReader() {
       return new JavaUnsafeRowReadTask(start - 1, end);
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 092702a..ab37e49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -167,7 +167,7 @@ class SimpleDataSourceV2 extends DataSourceV2 with 
ReadSupport {
 class SimpleReadTask(start: Int, end: Int) extends ReadTask[Row] with 
DataReader[Row] {
   private var current = start - 1
 
-  override def createReader(): DataReader[Row] = new SimpleReadTask(start, end)
+  override def createDataReader(): DataReader[Row] = new SimpleReadTask(start, 
end)
 
   override def next(): Boolean = {
     current += 1
@@ -233,7 +233,9 @@ class AdvancedReadTask(start: Int, end: Int, 
requiredSchema: StructType)
 
   private var current = start - 1
 
-  override def createReader(): DataReader[Row] = new AdvancedReadTask(start, 
end, requiredSchema)
+  override def createDataReader(): DataReader[Row] = {
+    new AdvancedReadTask(start, end, requiredSchema)
+  }
 
   override def close(): Unit = {}
 
@@ -273,7 +275,7 @@ class UnsafeRowReadTask(start: Int, end: Int)
 
   private var current = start - 1
 
-  override def createReader(): DataReader[UnsafeRow] = new 
UnsafeRowReadTask(start, end)
+  override def createDataReader(): DataReader[UnsafeRow] = new 
UnsafeRowReadTask(start, end)
 
   override def next(): Boolean = {
     current += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/44c40031/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 6fb60f4..cd7252e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.sources.v2
 
 import java.io.{BufferedReader, InputStreamReader, IOException}
-import java.text.SimpleDateFormat
-import java.util.{Collections, Date, List => JList, Locale, Optional, UUID}
+import java.util.{Collections, List => JList, Optional}
 
 import scala.collection.JavaConverters._
 
@@ -157,7 +156,7 @@ class SimpleCSVReadTask(path: String, conf: 
SerializableConfiguration)
   @transient private var currentLine: String = _
   @transient private var inputStream: FSDataInputStream = _
 
-  override def createReader(): DataReader[Row] = {
+  override def createDataReader(): DataReader[Row] = {
     val filePath = new Path(path)
     val fs = filePath.getFileSystem(conf.value)
     inputStream = fs.open(filePath)
@@ -185,7 +184,7 @@ class SimpleCSVReadTask(path: String, conf: 
SerializableConfiguration)
 class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: 
SerializableConfiguration)
   extends DataWriterFactory[Row] {
 
-  override def createWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[Row] = {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[Row] = {
     val jobPath = new Path(new Path(path, "_temporary"), jobId)
     val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
     val fs = filePath.getFileSystem(conf.value)
@@ -218,7 +217,7 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) 
extends DataWriter[Row] {
 class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: 
SerializableConfiguration)
   extends DataWriterFactory[InternalRow] {
 
-  override def createWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
     val jobPath = new Path(new Path(path, "_temporary"), jobId)
     val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
     val fs = filePath.getFileSystem(conf.value)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to