Repository: spark
Updated Branches:
  refs/heads/master 568398452 -> 32ec269d0


[SPARK-22909][SS] Move Structured Streaming v2 APIs to streaming folder

## What changes were proposed in this pull request?

This PR moves Structured Streaming v2 APIs to streaming folder as following:
```
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming
├── ContinuousReadSupport.java
├── ContinuousWriteSupport.java
├── MicroBatchReadSupport.java
├── MicroBatchWriteSupport.java
├── reader
│   ├── ContinuousDataReader.java
│   ├── ContinuousReader.java
│   ├── MicroBatchReader.java
│   ├── Offset.java
│   └── PartitionOffset.java
└── writer
    └── ContinuousWriter.java
```

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #20093 from zsxwing/move.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32ec269d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32ec269d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32ec269d

Branch: refs/heads/master
Commit: 32ec269d08313720aae3b47cce2f5e9c19702811
Parents: 5683984
Author: Shixiong Zhu <[email protected]>
Authored: Thu Dec 28 12:35:17 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Dec 28 12:35:17 2017 +0800

----------------------------------------------------------------------
 .../sql/sources/v2/ContinuousReadSupport.java   | 42 -----------
 .../sql/sources/v2/ContinuousWriteSupport.java  | 54 --------------
 .../sql/sources/v2/MicroBatchReadSupport.java   | 52 --------------
 .../sql/sources/v2/MicroBatchWriteSupport.java  | 58 ---------------
 .../sources/v2/reader/ContinuousDataReader.java | 36 ----------
 .../sql/sources/v2/reader/ContinuousReader.java | 74 --------------------
 .../sql/sources/v2/reader/MicroBatchReader.java | 70 ------------------
 .../spark/sql/sources/v2/reader/Offset.java     | 60 ----------------
 .../sql/sources/v2/reader/PartitionOffset.java  | 30 --------
 .../v2/streaming/ContinuousReadSupport.java     | 43 ++++++++++++
 .../v2/streaming/ContinuousWriteSupport.java    | 56 +++++++++++++++
 .../v2/streaming/MicroBatchReadSupport.java     | 54 ++++++++++++++
 .../v2/streaming/MicroBatchWriteSupport.java    | 60 ++++++++++++++++
 .../streaming/reader/ContinuousDataReader.java  | 34 +++++++++
 .../v2/streaming/reader/ContinuousReader.java   | 74 ++++++++++++++++++++
 .../v2/streaming/reader/MicroBatchReader.java   | 70 ++++++++++++++++++
 .../sql/sources/v2/streaming/reader/Offset.java | 60 ++++++++++++++++
 .../v2/streaming/reader/PartitionOffset.java    | 30 ++++++++
 .../v2/streaming/writer/ContinuousWriter.java   | 44 ++++++++++++
 .../sql/sources/v2/writer/ContinuousWriter.java | 41 -----------
 .../datasources/v2/DataSourceV2ScanExec.scala   |  1 +
 .../datasources/v2/WriteToDataSourceV2.scala    |  1 +
 .../streaming/MicroBatchExecution.scala         |  2 +-
 .../streaming/RateSourceProvider.scala          |  3 +-
 .../execution/streaming/RateStreamOffset.scala  |  2 +-
 .../execution/streaming/StreamingRelation.scala |  3 +-
 .../ContinuousDataSourceRDDIter.scala           |  1 +
 .../continuous/ContinuousExecution.scala        |  7 +-
 .../continuous/ContinuousRateStreamSource.scala |  3 +-
 .../streaming/continuous/EpochCoordinator.scala |  5 +-
 .../streaming/sources/RateStreamSourceV2.scala  |  1 +
 .../execution/streaming/sources/memoryV2.scala  |  4 +-
 .../spark/sql/streaming/DataStreamReader.scala  |  3 +-
 .../sql/streaming/StreamingQueryManager.scala   |  2 +-
 .../execution/streaming/OffsetSeqLogSuite.scala |  1 -
 .../execution/streaming/RateSourceSuite.scala   |  1 -
 .../execution/streaming/RateSourceV2Suite.scala |  3 +-
 37 files changed, 552 insertions(+), 533 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
deleted file mode 100644
index ae4f858..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.sql.sources.v2.reader.ContinuousReader;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data reading ability for continuous stream processing.
- */
-public interface ContinuousReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link ContinuousReader} to scan the data from this data source.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  ContinuousReader createContinuousReader(Optional<StructType> schema, String 
checkpointLocation, DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
deleted file mode 100644
index 362d5f5..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.ContinuousWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability for continuous stream processing.
- */
[email protected]
-public interface ContinuousWriteSupport extends BaseStreamingSink {
-
-    /**
-     * Creates an optional {@link ContinuousWriter} to save the data to this 
data source. Data
-     * sources can return None if there is no writing needed to be done.
-     *
-     * @param queryId A unique string for the writing query. It's possible 
that there are many writing
-     *                queries running at the same time, and the returned 
{@link DataSourceV2Writer}
-     *                can use this id to distinguish itself from others.
-     * @param schema the schema of the data to be written.
-     * @param mode the output mode which determines what successive epoch 
output means to this
-     *             sink, please refer to {@link OutputMode} for more details.
-     * @param options the options for the returned data source writer, which 
is an immutable
-     *                case-insensitive string-to-string map.
-     */
-    Optional<ContinuousWriter> createContinuousWriter(
-        String queryId,
-        StructType schema,
-        OutputMode mode,
-        DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
deleted file mode 100644
index 442cad0..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.MicroBatchReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide streaming micro-batch data reading ability.
- */
[email protected]
-public interface MicroBatchReadSupport extends DataSourceV2 {
-  /**
-   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
-   * streaming query.
-   *
-   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
-   * alternate calls to setOffsetRange and createReadTasks for each batch to 
process, and then
-   * call stop() when the execution is complete. Note that a single query may 
have multiple
-   * executions due to restart or failure recovery.
-   *
-   * @param schema the user provided schema, or empty() if none was provided
-   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
-   *                           recovery. Readers for the same logical source 
in the same query
-   *                           will be given the same checkpointLocation.
-   * @param options the options for the returned data source reader, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  MicroBatchReader createMicroBatchReader(
-      Optional<StructType> schema,
-      String checkpointLocation,
-      DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
deleted file mode 100644
index 6364077..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
- * provide data writing ability and save the data from a microbatch to the 
data source.
- */
[email protected]
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
-  /**
-   * Creates an optional {@link DataSourceV2Writer} to save the data to this 
data source. Data
-   * sources can return None if there is no writing needed to be done.
-   *
-   * @param queryId A unique string for the writing query. It's possible that 
there are many writing
-   *                queries running at the same time, and the returned {@link 
DataSourceV2Writer}
-   *                can use this id to distinguish itself from others.
-   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
-   *                incrementing counter representing a consistent set of 
data; the same batch may
-   *                be started multiple times in failure recovery scenarios, 
but it will always
-   *                contain the same records.
-   * @param schema the schema of the data to be written.
-   * @param mode the output mode which determines what successive batch output 
means to this
-   *             sink, please refer to {@link OutputMode} for more details.
-   * @param options the options for the returned data source writer, which is 
an immutable
-   *                case-insensitive string-to-string map.
-   */
-  Optional<DataSourceV2Writer> createMicroBatchWriter(
-      String queryId,
-      long epochId,
-      StructType schema,
-      OutputMode mode,
-      DataSourceV2Options options);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
deleted file mode 100644
index 11b99a9..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
-
-import java.io.IOException;
-
-/**
- * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
- */
-public interface ContinuousDataReader<T> extends DataReader<T> {
-    /**
-     * Get the offset of the current record, or the start offset if no records 
have been read.
-     *
-     * The execution engine will call this method along with get() to keep 
track of the current
-     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
-     * as a restart checkpoint.
-     */
-    PartitionOffset getOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
deleted file mode 100644
index 34141d6..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
- * interface to allow reading in a continuous processing mode stream.
- *
- * Implementations must ensure each read task output is a {@link 
ContinuousDataReader}.
- */
-public interface ContinuousReader extends BaseStreamingSource, 
DataSourceV2Reader {
-    /**
-     * Merge offsets coming from {@link ContinuousDataReader} instances in 
each partition to
-     * a single global offset.
-     */
-    Offset mergeOffsets(PartitionOffset[] offsets);
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Set the desired start offset for read tasks created from this reader. 
The scan will start
-     * from the first record after the provided offset, or from an 
implementation-defined inferred
-     * starting point if no offset is provided.
-     */
-    void setOffset(Optional<Offset> start);
-
-    /**
-     * Return the specified or inferred start offset for this reader.
-     *
-     * @throws IllegalStateException if setOffset has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * The execution engine will call this method in every epoch to determine 
if new read tasks need
-     * to be generated, which may be required if for example the underlying 
source system has had
-     * partitions added or removed.
-     *
-     * If true, the query will be shut down and restarted with a new reader.
-     */
-    default boolean needsReconfiguration() {
-        return false;
-    }
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
deleted file mode 100644
index bd15c07..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.Offset;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
- * interface to indicate they allow micro-batch streaming reads.
- */
-public interface MicroBatchReader extends DataSourceV2Reader, 
BaseStreamingSource {
-    /**
-     * Set the desired offset range for read tasks created from this reader. 
Read tasks will
-     * generate only data within (`start`, `end`]; that is, from the first 
record after `start` to
-     * the record with offset `end`.
-     *
-     * @param start The initial offset to scan from. If not specified, scan 
from an
-     *              implementation-specified start point, such as the earliest 
available record.
-     * @param end The last offset to include in the scan. If not specified, 
scan up to an
-     *            implementation-defined endpoint, such as the last available 
offset
-     *            or the start offset plus a target batch size.
-     */
-    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
-
-    /**
-     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getStartOffset();
-
-    /**
-     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
-     * for this reader.
-     *
-     * @throws IllegalStateException if setOffsetRange has not been called
-     */
-    Offset getEndOffset();
-
-    /**
-     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-     */
-    Offset deserializeOffset(String json);
-
-    /**
-     * Informs the source that Spark has completed processing all data for 
offsets less than or
-     * equal to `end` and will only request offsets greater than `end` in the 
future.
-     */
-    void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
deleted file mode 100644
index ce1c489..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-/**
- * An abstract representation of progress through a [[MicroBatchReader]] or 
[[ContinuousReader]].
- * During execution, Offsets provided by the data source implementation will 
be logged and used as
- * restart checkpoints. Sources should provide an Offset implementation which 
they can use to
- * reconstruct the stream position where the offset was taken.
- */
-public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
-    /**
-     * A JSON-serialized representation of an Offset that is
-     * used for saving offsets to the offset log.
-     * Note: We assume that equivalent/equal offsets serialize to
-     * identical JSON strings.
-     *
-     * @return JSON string encoding
-     */
-    public abstract String json();
-
-    /**
-     * Equality based on JSON string representation. We leverage the
-     * JSON representation for normalization between the Offset's
-     * in memory and on disk representations.
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
-            return 
this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return this.json().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return this.json();
-    }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
deleted file mode 100644
index 07826b6..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Serializable;
-
-/**
- * Used for per-partition offsets in continuous processing. ContinuousReader 
implementations will
- * provide a method to merge these into a global Offset.
- *
- * These offsets must be serializable.
- */
-public interface PartitionOffset extends Serializable {
-    
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
new file mode 100644
index 0000000..8837bae
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+public interface ContinuousReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link ContinuousReader} to scan the data from this data source.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  ContinuousReader createContinuousReader(Optional<StructType> schema, String 
checkpointLocation, DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
new file mode 100644
index 0000000..ec15e43
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability for continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriteSupport extends BaseStreamingSink {
+
+    /**
+     * Creates an optional {@link ContinuousWriter} to save the data to this 
data source. Data
+     * sources can return None if there is no writing needed to be done.
+     *
+     * @param queryId A unique string for the writing query. It's possible 
that there are many writing
+     *                queries running at the same time, and the returned 
{@link DataSourceV2Writer}
+     *                can use this id to distinguish itself from others.
+     * @param schema the schema of the data to be written.
+     * @param mode the output mode which determines what successive epoch 
output means to this
+     *             sink, please refer to {@link OutputMode} for more details.
+     * @param options the options for the returned data source writer, which 
is an immutable
+     *                case-insensitive string-to-string map.
+     */
+    Optional<ContinuousWriter> createContinuousWriter(
+        String queryId,
+        StructType schema,
+        OutputMode mode,
+        DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
new file mode 100644
index 0000000..3c87a3d
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide streaming micro-batch data reading ability.
+ */
[email protected]
+public interface MicroBatchReadSupport extends DataSourceV2 {
+  /**
+   * Creates a {@link MicroBatchReader} to read batches of data from this data 
source in a
+   * streaming query.
+   *
+   * The execution engine will create a micro-batch reader at the start of a 
streaming query,
+   * alternate calls to setOffsetRange and createReadTasks for each batch to 
process, and then
+   * call stop() when the execution is complete. Note that a single query may 
have multiple
+   * executions due to restart or failure recovery.
+   *
+   * @param schema the user provided schema, or empty() if none was provided
+   * @param checkpointLocation a path to Hadoop FS scratch space that can be 
used for failure
+   *                           recovery. Readers for the same logical source 
in the same query
+   *                           will be given the same checkpointLocation.
+   * @param options the options for the returned data source reader, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  MicroBatchReader createMicroBatchReader(
+      Optional<StructType> schema,
+      String checkpointLocation,
+      DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
new file mode 100644
index 0000000..b5e3e44
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement 
this interface to
+ * provide data writing ability and save the data from a microbatch to the 
data source.
+ */
[email protected]
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+  /**
+   * Creates an optional {@link DataSourceV2Writer} to save the data to this 
data source. Data
+   * sources can return None if there is no writing needed to be done.
+   *
+   * @param queryId A unique string for the writing query. It's possible that 
there are many writing
+   *                queries running at the same time, and the returned {@link 
DataSourceV2Writer}
+   *                can use this id to distinguish itself from others.
+   * @param epochId The uniquenumeric ID of the batch within this writing 
query. This is an
+   *                incrementing counter representing a consistent set of 
data; the same batch may
+   *                be started multiple times in failure recovery scenarios, 
but it will always
+   *                contain the same records.
+   * @param schema the schema of the data to be written.
+   * @param mode the output mode which determines what successive batch output 
means to this
+   *             sink, please refer to {@link OutputMode} for more details.
+   * @param options the options for the returned data source writer, which is 
an immutable
+   *                case-insensitive string-to-string map.
+   */
+  Optional<DataSourceV2Writer> createMicroBatchWriter(
+      String queryId,
+      long epochId,
+      StructType schema,
+      OutputMode mode,
+      DataSourceV2Options options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
new file mode 100644
index 0000000..ca9a290
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous 
processing mode.
+ */
+public interface ContinuousDataReader<T> extends DataReader<T> {
+    /**
+     * Get the offset of the current record, or the start offset if no records 
have been read.
+     *
+     * The execution engine will call this method along with get() to keep 
track of the current
+     * offset. When an epoch ends, the offset of the previous record in each 
partition will be saved
+     * as a restart checkpoint.
+     */
+    PartitionOffset getOffset();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
new file mode 100644
index 0000000..f0b2058
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each read task output is a {@link 
ContinuousDataReader}.
+ */
+public interface ContinuousReader extends BaseStreamingSource, 
DataSourceV2Reader {
+    /**
+     * Merge offsets coming from {@link ContinuousDataReader} instances in 
each partition to
+     * a single global offset.
+     */
+    Offset mergeOffsets(PartitionOffset[] offsets);
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Set the desired start offset for read tasks created from this reader. 
The scan will start
+     * from the first record after the provided offset, or from an 
implementation-defined inferred
+     * starting point if no offset is provided.
+     */
+    void setOffset(Optional<Offset> start);
+
+    /**
+     * Return the specified or inferred start offset for this reader.
+     *
+     * @throws IllegalStateException if setOffset has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * The execution engine will call this method in every epoch to determine 
if new read tasks need
+     * to be generated, which may be required if for example the underlying 
source system has had
+     * partitions added or removed.
+     *
+     * If true, the query will be shut down and restarted with a new reader.
+     */
+    default boolean needsReconfiguration() {
+        return false;
+    }
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
new file mode 100644
index 0000000..70ff756
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can 
implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ */
+public interface MicroBatchReader extends DataSourceV2Reader, 
BaseStreamingSource {
+    /**
+     * Set the desired offset range for read tasks created from this reader. 
Read tasks will
+     * generate only data within (`start`, `end`]; that is, from the first 
record after `start` to
+     * the record with offset `end`.
+     *
+     * @param start The initial offset to scan from. If not specified, scan 
from an
+     *              implementation-specified start point, such as the earliest 
available record.
+     * @param end The last offset to include in the scan. If not specified, 
scan up to an
+     *            implementation-defined endpoint, such as the last available 
offset
+     *            or the start offset plus a target batch size.
+     */
+    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+    /**
+     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getEndOffset();
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
new file mode 100644
index 0000000..517fdab
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+/**
+ * An abstract representation of progress through a [[MicroBatchReader]] or 
[[ContinuousReader]].
+ * During execution, Offsets provided by the data source implementation will 
be logged and used as
+ * restart checkpoints. Sources should provide an Offset implementation which 
they can use to
+ * reconstruct the stream position where the offset was taken.
+ */
+public abstract class Offset extends 
org.apache.spark.sql.execution.streaming.Offset {
+    /**
+     * A JSON-serialized representation of an Offset that is
+     * used for saving offsets to the offset log.
+     * Note: We assume that equivalent/equal offsets serialize to
+     * identical JSON strings.
+     *
+     * @return JSON string encoding
+     */
+    public abstract String json();
+
+    /**
+     * Equality based on JSON string representation. We leverage the
+     * JSON representation for normalization between the Offset's
+     * in memory and on disk representations.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
+            return 
this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) 
obj).json());
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return this.json().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return this.json();
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
new file mode 100644
index 0000000..729a612
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import java.io.Serializable;
+
+/**
+ * Used for per-partition offsets in continuous processing. ContinuousReader 
implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+public interface PartitionOffset extends Serializable {
+    
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
new file mode 100644
index 0000000..723395b
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceV2Writer} for use with continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriter extends DataSourceV2Writer {
+  /**
+   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to have been
+   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  default void commit(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+       "Commit without epoch should not be called with ContinuousWriter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
deleted file mode 100644
index 618f47e..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A {@link DataSourceV2Writer} for use with continuous stream processing.
- */
[email protected]
-public interface ContinuousWriter extends DataSourceV2Writer {
-  /**
-   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
-   * messages are collected from successful data writers and are produced by
-   * {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
-   */
-  void commit(long epochId, WriterCommitMessage[] messages);
-
-  default void commit(WriterCommitMessage[] messages) {
-    throw new UnsupportedOperationException(
-       "Commit without epoch should not be called with ContinuousWriter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index e4fca1b..49c506b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousDataSourceRDD, 
ContinuousExecution, EpochCoordinatorRef, SetReaderPartitions}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 1862da8..f0bdf84 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import 
org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, 
ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 20f9810..9a7a13f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.util.{Clock, Utils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index 3f85fa9..d02cf88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -33,7 +33,8 @@ import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamR
 import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
 import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
MicroBatchReader}
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{ManualClock, SystemClock}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
index 65d6d18..261d69b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
@@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.sql.sources.v2
 
 case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, 
ValueRunTimeMsPair])
-  extends v2.reader.Offset {
+  extends v2.streaming.reader.Offset {
   implicit val defaultFormats: DefaultFormats = DefaultFormats
   override val json = Serialization.write(partitionToValueAndRunTimeMs)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 0ca2e78..a9d50e3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2}
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
index 89fb2ac..d79e4bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, Ro
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
PartitionOffset}
 import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.util.{SystemClock, ThreadUtils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 1c35b06..2843ab1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,9 +29,10 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
WriteToDataSourceV2}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
ContinuousWriteSupport, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset, 
PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, 
ContinuousWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, 
Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{Clock, Utils}

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 89a8562..c9aa78a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateSourceProvider, 
RateStreamOffset, ValueRunTimeMsPair}
 import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2, 
DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
 
 case class ContinuousRateStreamPartitionOffset(

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 7f1e8ab..98017c3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -26,8 +26,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, 
PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter, 
WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, 
PartitionOffset}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
 import org.apache.spark.util.RpcUtils
 
 private[continuous] sealed trait EpochCoordinatorMessage extends Serializable

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index 1c66aed..97bada08 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.streaming.{RateStreamOffset, 
ValueRunTimeMsPair}
 import org.apache.spark.sql.sources.v2.DataSourceV2Options
 import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, 
Offset}
 import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
 import org.apache.spark.util.SystemClock
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 972248d..da7c31c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -29,7 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
 import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, 
DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport, 
MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f17935e..acd5ca1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, 
Dataset, SparkSession
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{StreamingRelation, 
StreamingRelationV2}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index e808ffa..b508f44 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.ContinuousWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 4868ba4..e6cdc06 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
index ceba27b..03d0f63 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.reader.Offset
 import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.util.ManualClock
 

http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index dc833b2..e11705a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, 
RateStreamSourceV2, RateStreamV2Reader}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
 import org.apache.spark.sql.streaming.StreamTest
 
 class RateSourceV2Suite extends StreamTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to