Repository: spark
Updated Branches:
  refs/heads/master 5056877e8 -> 0a9ac0248


http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 6130448..ff028eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -23,19 +23,18 @@ import org.json4s.DefaultFormats
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateSourceProvider, 
RateStreamOffset, ValueRunTimeMsPair}
 import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
-import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.sql.types.StructType
 
 case class RateStreamPartitionOffset(
    partition: Int, currentValue: Long, currentTimeMs: Long) extends 
PartitionOffset
 
-class RateStreamContinuousReader(options: DataSourceV2Options)
+class RateStreamContinuousReader(options: DataSourceOptions)
   extends ContinuousReader {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
index 7c1700f..d46f4d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, 
WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
 
 /** Common methods used to create writes for the the console sink */
-class ConsoleWriter(schema: StructType, options: DataSourceV2Options)
+class ConsoleWriter(schema: StructType, options: DataSourceOptions)
     extends StreamWriter with Logging {
 
   // Number of rows to display, by default 20 rows

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
index d7f3ba8..d7ce9a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWriter.scala
@@ -20,14 +20,14 @@ package org.apache.spark.sql.execution.streaming.sources
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
-import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, 
DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, 
DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
 
 /**
- * A [[DataSourceV2Writer]] used to hook V2 stream writers into a microbatch 
plan. It implements
+ * A [[DataSourceWriter]] used to hook V2 stream writers into a microbatch 
plan. It implements
  * the non-streaming interface, forwarding the batch ID determined at 
construction to a wrapped
  * streaming writer.
  */
-class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends 
DataSourceV2Writer {
+class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends 
DataSourceWriter {
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     writer.commit(batchId, messages)
   }
@@ -38,7 +38,7 @@ class MicroBatchWriter(batchId: Long, writer: StreamWriter) 
extends DataSourceV2
 }
 
 class InternalRowMicroBatchWriter(batchId: Long, writer: StreamWriter)
-  extends DataSourceV2Writer with SupportsWriteInternalRow {
+  extends DataSourceWriter with SupportsWriteInternalRow {
   override def commit(messages: Array[WriterCommitMessage]): Unit = {
     writer.commit(batchId, messages)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index 9282ba0..248295e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -21,11 +21,11 @@ import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, 
WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, 
DataWriterFactory, WriterCommitMessage}
 
 /**
  * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit 
message for delivery
- * to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the 
driver.
+ * to a [[DataSourceWriter]] on the driver.
  *
  * Note that, because it sends all rows to the driver, this factory will 
generally be unsuitable
  * for production-quality sinks. It's intended for use in tests.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index a25cc4f..43949e6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, 
ValueRunTimeMsPair}
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
@@ -44,14 +44,14 @@ class RateSourceProviderV2 extends DataSourceV2 with 
MicroBatchReadSupport with
   override def createMicroBatchReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceV2Options): MicroBatchReader = {
+      options: DataSourceOptions): MicroBatchReader = {
     new RateStreamMicroBatchReader(options)
   }
 
   override def shortName(): String = "ratev2"
 }
 
-class RateStreamMicroBatchReader(options: DataSourceV2Options)
+class RateStreamMicroBatchReader(options: DataSourceOptions)
   extends MicroBatchReader {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index ce55e44..5876726 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
 import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
 import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
 import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
 import org.apache.spark.sql.sources.v2.writer._
@@ -45,7 +45,7 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with Logging {
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): StreamWriter = {
+      options: DataSourceOptions): StreamWriter = {
     new MemoryStreamWriter(this, mode)
   }
 
@@ -114,7 +114,7 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with Logging {
 case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends 
WriterCommitMessage {}
 
 class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
-  extends DataSourceV2Writer with Logging {
+  extends DataSourceWriter with Logging {
 
   override def createWriterFactory: MemoryWriterFactory = 
MemoryWriterFactory(outputMode)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9f5ca9f..f1b3f93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, 
StreamingRelationV2}
 import org.apache.spark.sql.sources.StreamSourceProvider
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, 
MicroBatchReadSupport}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -158,7 +158,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
     }
 
     val ds = DataSource.lookupDataSource(source, 
sparkSession.sqlContext.conf).newInstance()
-    val options = new DataSourceV2Options(extraOptions.asJava)
+    val options = new DataSourceOptions(extraOptions.asJava)
     // We need to generate the V1 data source so we can pass it to the V2 
relation as a shim.
     // We can't be sure at this point whether we'll actually want to use V2, 
since we don't know the
     // writer or whether the query is continuous.

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 4026ee4..d421f7d 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -24,15 +24,15 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceV2Reader, SupportsPushDownRequiredColumns,
+  class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
       SupportsPushDownFilters {
 
     private StructType requiredSchema = new StructType().add("i", 
"int").add("j", "int");
@@ -131,7 +131,7 @@ public class JavaAdvancedDataSourceV2 implements 
DataSourceV2, ReadSupport {
 
 
   @Override
-  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
index 34e6c63..c550937 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.DataTypes;
@@ -33,7 +33,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceV2Reader, SupportsScanColumnarBatch {
+  class Reader implements DataSourceReader, SupportsScanColumnarBatch {
     private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
 
     @Override
@@ -108,7 +108,7 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, 
ReadSupport {
 
 
   @Override
-  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index d0c8750..99cca0f 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -23,15 +23,15 @@ import java.util.List;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport 
{
 
-  class Reader implements DataSourceV2Reader, SupportsReportPartitioning {
+  class Reader implements DataSourceReader, SupportsReportPartitioning {
     private final StructType schema = new StructType().add("a", 
"int").add("b", "int");
 
     @Override
@@ -104,7 +104,7 @@ public class JavaPartitionAwareDataSource implements 
DataSourceV2, ReadSupport {
   }
 
   @Override
-  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index f997366..048d078 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -20,16 +20,16 @@ package test.org.apache.spark.sql.sources.v2;
 import java.util.List;
 
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, 
ReadSupportWithSchema {
 
-  class Reader implements DataSourceV2Reader {
+  class Reader implements DataSourceReader {
     private final StructType schema;
 
     Reader(StructType schema) {
@@ -48,7 +48,7 @@ public class JavaSchemaRequiredDataSource implements 
DataSourceV2, ReadSupportWi
   }
 
   @Override
-  public DataSourceV2Reader createReader(StructType schema, 
DataSourceV2Options options) {
+  public DataSourceReader createReader(StructType schema, DataSourceOptions 
options) {
     return new Reader(schema);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
index 2beed43..96f55b8 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
@@ -23,16 +23,16 @@ import java.util.List;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.DataReader;
 import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceV2Reader {
+  class Reader implements DataSourceReader {
     private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
 
     @Override
@@ -80,7 +80,7 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, 
ReadSupport {
   }
 
   @Override
-  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
index e818752..c3916e0 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
@@ -21,15 +21,15 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.ReadSupport;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
 
-  class Reader implements DataSourceV2Reader, SupportsScanUnsafeRow {
+  class Reader implements DataSourceReader, SupportsScanUnsafeRow {
     private final StructType schema = new StructType().add("i", 
"int").add("j", "int");
 
     @Override
@@ -83,7 +83,7 @@ public class JavaUnsafeRowDataSourceV2 implements 
DataSourceV2, ReadSupport {
   }
 
   @Override
-  public DataSourceV2Reader createReader(DataSourceV2Options options) {
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new Reader();
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index d2cfe79..b060aee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, 
RateStreamMicroBatchReader, RateStreamSourceV2}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, 
MicroBatchReadSupport}
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.util.ManualClock
@@ -49,7 +49,7 @@ class RateSourceV2Suite extends StreamTest {
   test("microbatch in registry") {
     DataSource.lookupDataSource("ratev2", spark.sqlContext.conf).newInstance() 
match {
       case ds: MicroBatchReadSupport =>
-        val reader = ds.createMicroBatchReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+        val reader = ds.createMicroBatchReader(Optional.empty(), "", 
DataSourceOptions.empty())
         assert(reader.isInstanceOf[RateStreamMicroBatchReader])
       case _ =>
         throw new IllegalStateException("Could not find v2 read support for 
rate")
@@ -76,14 +76,14 @@ class RateSourceV2Suite extends StreamTest {
 
   test("microbatch - numPartitions propagated") {
     val reader = new RateStreamMicroBatchReader(
-      new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
+      new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
     reader.setOffsetRange(Optional.empty(), Optional.empty())
     val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 11)
   }
 
   test("microbatch - set offset") {
-    val reader = new RateStreamMicroBatchReader(DataSourceV2Options.empty())
+    val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty())
     val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
     val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 2000))))
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@@ -93,7 +93,7 @@ class RateSourceV2Suite extends StreamTest {
 
   test("microbatch - infer offsets") {
     val reader = new RateStreamMicroBatchReader(
-      new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"100").asJava))
+      new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"100").asJava))
     reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
     reader.setOffsetRange(Optional.empty(), Optional.empty())
     reader.getStartOffset() match {
@@ -114,7 +114,7 @@ class RateSourceV2Suite extends StreamTest {
 
   test("microbatch - predetermined batch size") {
     val reader = new RateStreamMicroBatchReader(
-      new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"20").asJava))
+      new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> 
"20").asJava))
     val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
     val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@@ -125,7 +125,7 @@ class RateSourceV2Suite extends StreamTest {
 
   test("microbatch - data read") {
     val reader = new RateStreamMicroBatchReader(
-      new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
+      new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> 
"33").asJava))
     val startOffset = RateStreamSourceV2.createInitialOffset(11, 
reader.creationTimeMs)
     val endOffset = 
RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
       case (part, ValueRunTimeMsPair(currentVal, currentReadTime)) =>
@@ -150,7 +150,7 @@ class RateSourceV2Suite extends StreamTest {
   test("continuous in registry") {
     DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() 
match {
       case ds: ContinuousReadSupport =>
-        val reader = ds.createContinuousReader(Optional.empty(), "", 
DataSourceV2Options.empty())
+        val reader = ds.createContinuousReader(Optional.empty(), "", 
DataSourceOptions.empty())
         assert(reader.isInstanceOf[RateStreamContinuousReader])
       case _ =>
         throw new IllegalStateException("Could not find v2 read support for 
rate")
@@ -159,7 +159,7 @@ class RateSourceV2Suite extends StreamTest {
 
   test("continuous data") {
     val reader = new RateStreamContinuousReader(
-      new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> 
"20").asJava))
+      new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> 
"20").asJava))
     reader.setOffset(Optional.empty())
     val tasks = reader.createDataReaderFactories()
     assert(tasks.size == 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
new file mode 100644
index 0000000..31dfc55
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * A simple test suite to verify `DataSourceOptions`.
+ */
+class DataSourceOptionsSuite extends SparkFunSuite {
+
+  test("key is case-insensitive") {
+    val options = new DataSourceOptions(Map("foo" -> "bar").asJava)
+    assert(options.get("foo").get() == "bar")
+    assert(options.get("FoO").get() == "bar")
+    assert(!options.get("abc").isPresent)
+  }
+
+  test("value is case-sensitive") {
+    val options = new DataSourceOptions(Map("foo" -> "bAr").asJava)
+    assert(options.get("foo").get == "bAr")
+  }
+
+  test("getInt") {
+    val options = new DataSourceOptions(Map("numFOo" -> "1", "foo" -> 
"bar").asJava)
+    assert(options.getInt("numFOO", 10) == 1)
+    assert(options.getInt("numFOO2", 10) == 10)
+
+    intercept[NumberFormatException]{
+      options.getInt("foo", 1)
+    }
+  }
+
+  test("getBoolean") {
+    val options = new DataSourceOptions(
+      Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava)
+    assert(options.getBoolean("isFoo", false))
+    assert(!options.getBoolean("isFoo2", true))
+    assert(options.getBoolean("isBar", true))
+    assert(!options.getBoolean("isBar", false))
+    assert(!options.getBoolean("FOO", true))
+  }
+
+  test("getLong") {
+    val options = new DataSourceOptions(Map("numFoo" -> "9223372036854775807",
+      "foo" -> "bar").asJava)
+    assert(options.getLong("numFOO", 0L) == 9223372036854775807L)
+    assert(options.getLong("numFoo2", -1L) == -1L)
+
+    intercept[NumberFormatException]{
+      options.getLong("foo", 0L)
+    }
+  }
+
+  test("getDouble") {
+    val options = new DataSourceOptions(Map("numFoo" -> "922337.1",
+      "foo" -> "bar").asJava)
+    assert(options.getDouble("numFOO", 0d) == 922337.1d)
+    assert(options.getDouble("numFoo2", -1.02d) == -1.02d)
+
+    intercept[NumberFormatException]{
+      options.getDouble("foo", 0.1d)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
deleted file mode 100644
index 90d9286..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2OptionsSuite.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * A simple test suite to verify `DataSourceV2Options`.
- */
-class DataSourceV2OptionsSuite extends SparkFunSuite {
-
-  test("key is case-insensitive") {
-    val options = new DataSourceV2Options(Map("foo" -> "bar").asJava)
-    assert(options.get("foo").get() == "bar")
-    assert(options.get("FoO").get() == "bar")
-    assert(!options.get("abc").isPresent)
-  }
-
-  test("value is case-sensitive") {
-    val options = new DataSourceV2Options(Map("foo" -> "bAr").asJava)
-    assert(options.get("foo").get == "bAr")
-  }
-
-  test("getInt") {
-    val options = new DataSourceV2Options(Map("numFOo" -> "1", "foo" -> 
"bar").asJava)
-    assert(options.getInt("numFOO", 10) == 1)
-    assert(options.getInt("numFOO2", 10) == 10)
-
-    intercept[NumberFormatException]{
-      options.getInt("foo", 1)
-    }
-  }
-
-  test("getBoolean") {
-    val options = new DataSourceV2Options(
-      Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava)
-    assert(options.getBoolean("isFoo", false))
-    assert(!options.getBoolean("isFoo2", true))
-    assert(options.getBoolean("isBar", true))
-    assert(!options.getBoolean("isBar", false))
-    assert(!options.getBoolean("FOO", true))
-  }
-
-  test("getLong") {
-    val options = new DataSourceV2Options(Map("numFoo" -> 
"9223372036854775807",
-      "foo" -> "bar").asJava)
-    assert(options.getLong("numFOO", 0L) == 9223372036854775807L)
-    assert(options.getLong("numFoo2", -1L) == -1L)
-
-    intercept[NumberFormatException]{
-      options.getLong("foo", 0L)
-    }
-  }
-
-  test("getDouble") {
-    val options = new DataSourceV2Options(Map("numFoo" -> "922337.1",
-      "foo" -> "bar").asJava)
-    assert(options.getDouble("numFOO", 0d) == 922337.1d)
-    assert(options.getDouble("numFoo2", -1.02d) == -1.02d)
-
-    intercept[NumberFormatException]{
-      options.getDouble("foo", 0.1d)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 42c5d3b..ee50e8a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -201,7 +201,7 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
 
 class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceV2Reader {
+  class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
     override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@@ -209,7 +209,7 @@ class SimpleDataSourceV2 extends DataSourceV2 with 
ReadSupport {
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
 class SimpleDataReaderFactory(start: Int, end: Int)
@@ -233,7 +233,7 @@ class SimpleDataReaderFactory(start: Int, end: Int)
 
 class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceV2Reader
+  class Reader extends DataSourceReader
     with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
 
     var requiredSchema = new StructType().add("i", "int").add("j", "int")
@@ -275,7 +275,7 @@ class AdvancedDataSourceV2 extends DataSourceV2 with 
ReadSupport {
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
 class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: 
StructType)
@@ -306,7 +306,7 @@ class AdvancedDataReaderFactory(start: Int, end: Int, 
requiredSchema: StructType
 
 class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceV2Reader with SupportsScanUnsafeRow {
+  class Reader extends DataSourceReader with SupportsScanUnsafeRow {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
     override def createUnsafeRowReaderFactories(): 
JList[DataReaderFactory[UnsafeRow]] = {
@@ -315,7 +315,7 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with 
ReadSupport {
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
 class UnsafeRowDataReaderFactory(start: Int, end: Int)
@@ -343,18 +343,18 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
 
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema 
{
 
-  class Reader(val readSchema: StructType) extends DataSourceV2Reader {
+  class Reader(val readSchema: StructType) extends DataSourceReader {
     override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
       java.util.Collections.emptyList()
   }
 
-  override def createReader(schema: StructType, options: DataSourceV2Options): 
DataSourceV2Reader =
+  override def createReader(schema: StructType, options: DataSourceOptions): 
DataSourceReader =
     new Reader(schema)
 }
 
 class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceV2Reader with SupportsScanColumnarBatch {
+  class Reader extends DataSourceReader with SupportsScanColumnarBatch {
     override def readSchema(): StructType = new StructType().add("i", 
"int").add("j", "int")
 
     override def createBatchDataReaderFactories(): 
JList[DataReaderFactory[ColumnarBatch]] = {
@@ -362,7 +362,7 @@ class BatchDataSourceV2 extends DataSourceV2 with 
ReadSupport {
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
 class BatchDataReaderFactory(start: Int, end: Int)
@@ -406,7 +406,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
 
 class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
 
-  class Reader extends DataSourceV2Reader with SupportsReportPartitioning {
+  class Reader extends DataSourceReader with SupportsReportPartitioning {
     override def readSchema(): StructType = new StructType().add("a", 
"int").add("b", "int")
 
     override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@@ -428,7 +428,7 @@ class PartitionAwareDataSource extends DataSourceV2 with 
ReadSupport {
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= new Reader
+  override def createReader(options: DataSourceOptions): DataSourceReader = 
new Reader
 }
 
 class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 3310d6d..a131b16 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, 
Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, 
DataSourceV2Reader}
+import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, 
DataSourceReader}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -42,7 +42,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
 
   private val schema = new StructType().add("i", "long").add("j", "long")
 
-  class Reader(path: String, conf: Configuration) extends DataSourceV2Reader {
+  class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 
     override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@@ -64,7 +64,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
     }
   }
 
-  class Writer(jobId: String, path: String, conf: Configuration) extends 
DataSourceV2Writer {
+  class Writer(jobId: String, path: String, conf: Configuration) extends 
DataSourceWriter {
     override def createWriterFactory(): DataWriterFactory[Row] = {
       new SimpleCSVDataWriterFactory(path, jobId, new 
SerializableConfiguration(conf))
     }
@@ -104,7 +104,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
     }
   }
 
-  override def createReader(options: DataSourceV2Options): DataSourceV2Reader 
= {
+  override def createReader(options: DataSourceOptions): DataSourceReader = {
     val path = new Path(options.get("path").get())
     val conf = SparkContext.getActive.get.hadoopConfiguration
     new Reader(path.toUri.toString, conf)
@@ -114,7 +114,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
       jobId: String,
       schema: StructType,
       mode: SaveMode,
-      options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
+      options: DataSourceOptions): Optional[DataSourceWriter] = {
     assert(DataType.equalsStructurally(schema.asNullable, 
this.schema.asNullable))
     assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", 
false))
 
@@ -141,7 +141,7 @@ class SimpleWritableDataSource extends DataSourceV2 with 
ReadSupport with WriteS
   }
 
   private def createWriter(
-      jobId: String, path: Path, conf: Configuration, internal: Boolean): 
DataSourceV2Writer = {
+      jobId: String, path: Path, conf: Configuration, internal: Boolean): 
DataSourceWriter = {
     val pathStr = path.toUri.toString
     if (internal) {
       new InternalRowWriter(jobId, pathStr, conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a9ac024/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index dc8c857..3127d66 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, Streami
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
-import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
 import org.apache.spark.sql.sources.v2.streaming._
 import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, 
MicroBatchReader, Offset, PartitionOffset}
@@ -54,14 +54,14 @@ trait FakeMicroBatchReadSupport extends 
MicroBatchReadSupport {
   override def createMicroBatchReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceV2Options): MicroBatchReader = FakeReader()
+      options: DataSourceOptions): MicroBatchReader = FakeReader()
 }
 
 trait FakeContinuousReadSupport extends ContinuousReadSupport {
   override def createContinuousReader(
       schema: Optional[StructType],
       checkpointLocation: String,
-      options: DataSourceV2Options): ContinuousReader = FakeReader()
+      options: DataSourceOptions): ContinuousReader = FakeReader()
 }
 
 trait FakeStreamWriteSupport extends StreamWriteSupport {
@@ -69,7 +69,7 @@ trait FakeStreamWriteSupport extends StreamWriteSupport {
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceV2Options): StreamWriter = {
+      options: DataSourceOptions): StreamWriter = {
     throw new IllegalStateException("fake sink - cannot actually write")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to