Repository: spark
Updated Branches:
refs/heads/master 568398452 -> 32ec269d0
[SPARK-22909][SS] Move Structured Streaming v2 APIs to streaming folder
## What changes were proposed in this pull request?
This PR moves Structured Streaming v2 APIs to streaming folder as following:
```
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming
âââ ContinuousReadSupport.java
âââ ContinuousWriteSupport.java
âââ MicroBatchReadSupport.java
âââ MicroBatchWriteSupport.java
âââ reader
â  âââ ContinuousDataReader.java
â  âââ ContinuousReader.java
â  âââ MicroBatchReader.java
â  âââ Offset.java
â  âââ PartitionOffset.java
âââ writer
âââ ContinuousWriter.java
```
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <[email protected]>
Closes #20093 from zsxwing/move.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32ec269d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32ec269d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32ec269d
Branch: refs/heads/master
Commit: 32ec269d08313720aae3b47cce2f5e9c19702811
Parents: 5683984
Author: Shixiong Zhu <[email protected]>
Authored: Thu Dec 28 12:35:17 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu Dec 28 12:35:17 2017 +0800
----------------------------------------------------------------------
.../sql/sources/v2/ContinuousReadSupport.java | 42 -----------
.../sql/sources/v2/ContinuousWriteSupport.java | 54 --------------
.../sql/sources/v2/MicroBatchReadSupport.java | 52 --------------
.../sql/sources/v2/MicroBatchWriteSupport.java | 58 ---------------
.../sources/v2/reader/ContinuousDataReader.java | 36 ----------
.../sql/sources/v2/reader/ContinuousReader.java | 74 --------------------
.../sql/sources/v2/reader/MicroBatchReader.java | 70 ------------------
.../spark/sql/sources/v2/reader/Offset.java | 60 ----------------
.../sql/sources/v2/reader/PartitionOffset.java | 30 --------
.../v2/streaming/ContinuousReadSupport.java | 43 ++++++++++++
.../v2/streaming/ContinuousWriteSupport.java | 56 +++++++++++++++
.../v2/streaming/MicroBatchReadSupport.java | 54 ++++++++++++++
.../v2/streaming/MicroBatchWriteSupport.java | 60 ++++++++++++++++
.../streaming/reader/ContinuousDataReader.java | 34 +++++++++
.../v2/streaming/reader/ContinuousReader.java | 74 ++++++++++++++++++++
.../v2/streaming/reader/MicroBatchReader.java | 70 ++++++++++++++++++
.../sql/sources/v2/streaming/reader/Offset.java | 60 ++++++++++++++++
.../v2/streaming/reader/PartitionOffset.java | 30 ++++++++
.../v2/streaming/writer/ContinuousWriter.java | 44 ++++++++++++
.../sql/sources/v2/writer/ContinuousWriter.java | 41 -----------
.../datasources/v2/DataSourceV2ScanExec.scala | 1 +
.../datasources/v2/WriteToDataSourceV2.scala | 1 +
.../streaming/MicroBatchExecution.scala | 2 +-
.../streaming/RateSourceProvider.scala | 3 +-
.../execution/streaming/RateStreamOffset.scala | 2 +-
.../execution/streaming/StreamingRelation.scala | 3 +-
.../ContinuousDataSourceRDDIter.scala | 1 +
.../continuous/ContinuousExecution.scala | 7 +-
.../continuous/ContinuousRateStreamSource.scala | 3 +-
.../streaming/continuous/EpochCoordinator.scala | 5 +-
.../streaming/sources/RateStreamSourceV2.scala | 1 +
.../execution/streaming/sources/memoryV2.scala | 4 +-
.../spark/sql/streaming/DataStreamReader.scala | 3 +-
.../sql/streaming/StreamingQueryManager.scala | 2 +-
.../execution/streaming/OffsetSeqLogSuite.scala | 1 -
.../execution/streaming/RateSourceSuite.scala | 1 -
.../execution/streaming/RateSourceV2Suite.scala | 3 +-
37 files changed, 552 insertions(+), 533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
deleted file mode 100644
index ae4f858..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousReadSupport.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.sql.sources.v2.reader.ContinuousReader;
-import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
- * provide data reading ability for continuous stream processing.
- */
-public interface ContinuousReadSupport extends DataSourceV2 {
- /**
- * Creates a {@link ContinuousReader} to scan the data from this data source.
- *
- * @param schema the user provided schema, or empty() if none was provided
- * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
- * recovery. Readers for the same logical source
in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is
an immutable
- * case-insensitive string-to-string map.
- */
- ContinuousReader createContinuousReader(Optional<StructType> schema, String
checkpointLocation, DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
deleted file mode 100644
index 362d5f5..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/ContinuousWriteSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.ContinuousWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
- * provide data writing ability for continuous stream processing.
- */
[email protected]
-public interface ContinuousWriteSupport extends BaseStreamingSink {
-
- /**
- * Creates an optional {@link ContinuousWriter} to save the data to this
data source. Data
- * sources can return None if there is no writing needed to be done.
- *
- * @param queryId A unique string for the writing query. It's possible
that there are many writing
- * queries running at the same time, and the returned
{@link DataSourceV2Writer}
- * can use this id to distinguish itself from others.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive epoch
output means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which
is an immutable
- * case-insensitive string-to-string map.
- */
- Optional<ContinuousWriter> createContinuousWriter(
- String queryId,
- StructType schema,
- OutputMode mode,
- DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
deleted file mode 100644
index 442cad0..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.MicroBatchReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
- * provide streaming micro-batch data reading ability.
- */
[email protected]
-public interface MicroBatchReadSupport extends DataSourceV2 {
- /**
- * Creates a {@link MicroBatchReader} to read batches of data from this data
source in a
- * streaming query.
- *
- * The execution engine will create a micro-batch reader at the start of a
streaming query,
- * alternate calls to setOffsetRange and createReadTasks for each batch to
process, and then
- * call stop() when the execution is complete. Note that a single query may
have multiple
- * executions due to restart or failure recovery.
- *
- * @param schema the user provided schema, or empty() if none was provided
- * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
- * recovery. Readers for the same logical source
in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is
an immutable
- * case-insensitive string-to-string map.
- */
- MicroBatchReader createMicroBatchReader(
- Optional<StructType> schema,
- String checkpointLocation,
- DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
deleted file mode 100644
index 6364077..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchWriteSupport.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
- * provide data writing ability and save the data from a microbatch to the
data source.
- */
[email protected]
-public interface MicroBatchWriteSupport extends BaseStreamingSink {
-
- /**
- * Creates an optional {@link DataSourceV2Writer} to save the data to this
data source. Data
- * sources can return None if there is no writing needed to be done.
- *
- * @param queryId A unique string for the writing query. It's possible that
there are many writing
- * queries running at the same time, and the returned {@link
DataSourceV2Writer}
- * can use this id to distinguish itself from others.
- * @param epochId The uniquenumeric ID of the batch within this writing
query. This is an
- * incrementing counter representing a consistent set of
data; the same batch may
- * be started multiple times in failure recovery scenarios,
but it will always
- * contain the same records.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive batch output
means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which is
an immutable
- * case-insensitive string-to-string map.
- */
- Optional<DataSourceV2Writer> createMicroBatchWriter(
- String queryId,
- long epochId,
- StructType schema,
- OutputMode mode,
- DataSourceV2Options options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
deleted file mode 100644
index 11b99a9..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
-
-import java.io.IOException;
-
-/**
- * A variation on {@link DataReader} for use with streaming in continuous
processing mode.
- */
-public interface ContinuousDataReader<T> extends DataReader<T> {
- /**
- * Get the offset of the current record, or the start offset if no records
have been read.
- *
- * The execution engine will call this method along with get() to keep
track of the current
- * offset. When an epoch ends, the offset of the previous record in each
partition will be saved
- * as a restart checkpoint.
- */
- PartitionOffset getOffset();
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
deleted file mode 100644
index 34141d6..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.PartitionOffset;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can
implement this
- * interface to allow reading in a continuous processing mode stream.
- *
- * Implementations must ensure each read task output is a {@link
ContinuousDataReader}.
- */
-public interface ContinuousReader extends BaseStreamingSource,
DataSourceV2Reader {
- /**
- * Merge offsets coming from {@link ContinuousDataReader} instances in
each partition to
- * a single global offset.
- */
- Offset mergeOffsets(PartitionOffset[] offsets);
-
- /**
- * Deserialize a JSON string into an Offset of the implementation-defined
offset type.
- * @throws IllegalArgumentException if the JSON does not encode a valid
offset for this reader
- */
- Offset deserializeOffset(String json);
-
- /**
- * Set the desired start offset for read tasks created from this reader.
The scan will start
- * from the first record after the provided offset, or from an
implementation-defined inferred
- * starting point if no offset is provided.
- */
- void setOffset(Optional<Offset> start);
-
- /**
- * Return the specified or inferred start offset for this reader.
- *
- * @throws IllegalStateException if setOffset has not been called
- */
- Offset getStartOffset();
-
- /**
- * The execution engine will call this method in every epoch to determine
if new read tasks need
- * to be generated, which may be required if for example the underlying
source system has had
- * partitions added or removed.
- *
- * If true, the query will be shut down and restarted with a new reader.
- */
- default boolean needsReconfiguration() {
- return false;
- }
-
- /**
- * Informs the source that Spark has completed processing all data for
offsets less than or
- * equal to `end` and will only request offsets greater than `end` in the
future.
- */
- void commit(Offset end);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
deleted file mode 100644
index bd15c07..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReader.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.sql.sources.v2.reader.Offset;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can
implement this
- * interface to indicate they allow micro-batch streaming reads.
- */
-public interface MicroBatchReader extends DataSourceV2Reader,
BaseStreamingSource {
- /**
- * Set the desired offset range for read tasks created from this reader.
Read tasks will
- * generate only data within (`start`, `end`]; that is, from the first
record after `start` to
- * the record with offset `end`.
- *
- * @param start The initial offset to scan from. If not specified, scan
from an
- * implementation-specified start point, such as the earliest
available record.
- * @param end The last offset to include in the scan. If not specified,
scan up to an
- * implementation-defined endpoint, such as the last available
offset
- * or the start offset plus a target batch size.
- */
- void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
-
- /**
- * Returns the specified (if explicitly set through setOffsetRange) or
inferred start offset
- * for this reader.
- *
- * @throws IllegalStateException if setOffsetRange has not been called
- */
- Offset getStartOffset();
-
- /**
- * Return the specified (if explicitly set through setOffsetRange) or
inferred end offset
- * for this reader.
- *
- * @throws IllegalStateException if setOffsetRange has not been called
- */
- Offset getEndOffset();
-
- /**
- * Deserialize a JSON string into an Offset of the implementation-defined
offset type.
- * @throws IllegalArgumentException if the JSON does not encode a valid
offset for this reader
- */
- Offset deserializeOffset(String json);
-
- /**
- * Informs the source that Spark has completed processing all data for
offsets less than or
- * equal to `end` and will only request offsets greater than `end` in the
future.
- */
- void commit(Offset end);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
deleted file mode 100644
index ce1c489..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Offset.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-/**
- * An abstract representation of progress through a [[MicroBatchReader]] or
[[ContinuousReader]].
- * During execution, Offsets provided by the data source implementation will
be logged and used as
- * restart checkpoints. Sources should provide an Offset implementation which
they can use to
- * reconstruct the stream position where the offset was taken.
- */
-public abstract class Offset extends
org.apache.spark.sql.execution.streaming.Offset {
- /**
- * A JSON-serialized representation of an Offset that is
- * used for saving offsets to the offset log.
- * Note: We assume that equivalent/equal offsets serialize to
- * identical JSON strings.
- *
- * @return JSON string encoding
- */
- public abstract String json();
-
- /**
- * Equality based on JSON string representation. We leverage the
- * JSON representation for normalization between the Offset's
- * in memory and on disk representations.
- */
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
- return
this.json().equals(((org.apache.spark.sql.execution.streaming.Offset)
obj).json());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return this.json().hashCode();
- }
-
- @Override
- public String toString() {
- return this.json();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
deleted file mode 100644
index 07826b6..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/PartitionOffset.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import java.io.Serializable;
-
-/**
- * Used for per-partition offsets in continuous processing. ContinuousReader
implementations will
- * provide a method to merge these into a global Offset.
- *
- * These offsets must be serializable.
- */
-public interface PartitionOffset extends Serializable {
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
new file mode 100644
index 0000000..8837bae
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+public interface ContinuousReadSupport extends DataSourceV2 {
+ /**
+ * Creates a {@link ContinuousReader} to scan the data from this data source.
+ *
+ * @param schema the user provided schema, or empty() if none was provided
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
+ * recovery. Readers for the same logical source
in the same query
+ * will be given the same checkpointLocation.
+ * @param options the options for the returned data source reader, which is
an immutable
+ * case-insensitive string-to-string map.
+ */
+ ContinuousReader createContinuousReader(Optional<StructType> schema, String
checkpointLocation, DataSourceV2Options options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
new file mode 100644
index 0000000..ec15e43
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousWriteSupport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
+ * provide data writing ability for continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriteSupport extends BaseStreamingSink {
+
+ /**
+ * Creates an optional {@link ContinuousWriter} to save the data to this
data source. Data
+ * sources can return None if there is no writing needed to be done.
+ *
+ * @param queryId A unique string for the writing query. It's possible
that there are many writing
+ * queries running at the same time, and the returned
{@link DataSourceV2Writer}
+ * can use this id to distinguish itself from others.
+ * @param schema the schema of the data to be written.
+ * @param mode the output mode which determines what successive epoch
output means to this
+ * sink, please refer to {@link OutputMode} for more details.
+ * @param options the options for the returned data source writer, which
is an immutable
+ * case-insensitive string-to-string map.
+ */
+ Optional<ContinuousWriter> createContinuousWriter(
+ String queryId,
+ StructType schema,
+ OutputMode mode,
+ DataSourceV2Options options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
new file mode 100644
index 0000000..3c87a3d
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
+ * provide streaming micro-batch data reading ability.
+ */
[email protected]
+public interface MicroBatchReadSupport extends DataSourceV2 {
+ /**
+ * Creates a {@link MicroBatchReader} to read batches of data from this data
source in a
+ * streaming query.
+ *
+ * The execution engine will create a micro-batch reader at the start of a
streaming query,
+ * alternate calls to setOffsetRange and createReadTasks for each batch to
process, and then
+ * call stop() when the execution is complete. Note that a single query may
have multiple
+ * executions due to restart or failure recovery.
+ *
+ * @param schema the user provided schema, or empty() if none was provided
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be
used for failure
+ * recovery. Readers for the same logical source
in the same query
+ * will be given the same checkpointLocation.
+ * @param options the options for the returned data source reader, which is
an immutable
+ * case-insensitive string-to-string map.
+ */
+ MicroBatchReader createMicroBatchReader(
+ Optional<StructType> schema,
+ String checkpointLocation,
+ DataSourceV2Options options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
new file mode 100644
index 0000000..b5e3e44
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchWriteSupport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement
this interface to
+ * provide data writing ability and save the data from a microbatch to the
data source.
+ */
[email protected]
+public interface MicroBatchWriteSupport extends BaseStreamingSink {
+
+ /**
+ * Creates an optional {@link DataSourceV2Writer} to save the data to this
data source. Data
+ * sources can return None if there is no writing needed to be done.
+ *
+ * @param queryId A unique string for the writing query. It's possible that
there are many writing
+ * queries running at the same time, and the returned {@link
DataSourceV2Writer}
+ * can use this id to distinguish itself from others.
+ * @param epochId The uniquenumeric ID of the batch within this writing
query. This is an
+ * incrementing counter representing a consistent set of
data; the same batch may
+ * be started multiple times in failure recovery scenarios,
but it will always
+ * contain the same records.
+ * @param schema the schema of the data to be written.
+ * @param mode the output mode which determines what successive batch output
means to this
+ * sink, please refer to {@link OutputMode} for more details.
+ * @param options the options for the returned data source writer, which is
an immutable
+ * case-insensitive string-to-string map.
+ */
+ Optional<DataSourceV2Writer> createMicroBatchWriter(
+ String queryId,
+ long epochId,
+ StructType schema,
+ OutputMode mode,
+ DataSourceV2Options options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
new file mode 100644
index 0000000..ca9a290
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous
processing mode.
+ */
+public interface ContinuousDataReader<T> extends DataReader<T> {
+ /**
+ * Get the offset of the current record, or the start offset if no records
have been read.
+ *
+ * The execution engine will call this method along with get() to keep
track of the current
+ * offset. When an epoch ends, the offset of the previous record in each
partition will be saved
+ * as a restart checkpoint.
+ */
+ PartitionOffset getOffset();
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
new file mode 100644
index 0000000..f0b2058
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can
implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each read task output is a {@link
ContinuousDataReader}.
+ */
+public interface ContinuousReader extends BaseStreamingSource,
DataSourceV2Reader {
+ /**
+ * Merge offsets coming from {@link ContinuousDataReader} instances in
each partition to
+ * a single global offset.
+ */
+ Offset mergeOffsets(PartitionOffset[] offsets);
+
+ /**
+ * Deserialize a JSON string into an Offset of the implementation-defined
offset type.
+ * @throws IllegalArgumentException if the JSON does not encode a valid
offset for this reader
+ */
+ Offset deserializeOffset(String json);
+
+ /**
+ * Set the desired start offset for read tasks created from this reader.
The scan will start
+ * from the first record after the provided offset, or from an
implementation-defined inferred
+ * starting point if no offset is provided.
+ */
+ void setOffset(Optional<Offset> start);
+
+ /**
+ * Return the specified or inferred start offset for this reader.
+ *
+ * @throws IllegalStateException if setOffset has not been called
+ */
+ Offset getStartOffset();
+
+ /**
+ * The execution engine will call this method in every epoch to determine
if new read tasks need
+ * to be generated, which may be required if for example the underlying
source system has had
+ * partitions added or removed.
+ *
+ * If true, the query will be shut down and restarted with a new reader.
+ */
+ default boolean needsReconfiguration() {
+ return false;
+ }
+
+ /**
+ * Informs the source that Spark has completed processing all data for
offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the
future.
+ */
+ void commit(Offset end);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
new file mode 100644
index 0000000..70ff756
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can
implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ */
+public interface MicroBatchReader extends DataSourceV2Reader,
BaseStreamingSource {
+ /**
+ * Set the desired offset range for read tasks created from this reader.
Read tasks will
+ * generate only data within (`start`, `end`]; that is, from the first
record after `start` to
+ * the record with offset `end`.
+ *
+ * @param start The initial offset to scan from. If not specified, scan
from an
+ * implementation-specified start point, such as the earliest
available record.
+ * @param end The last offset to include in the scan. If not specified,
scan up to an
+ * implementation-defined endpoint, such as the last available
offset
+ * or the start offset plus a target batch size.
+ */
+ void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+ /**
+ * Returns the specified (if explicitly set through setOffsetRange) or
inferred start offset
+ * for this reader.
+ *
+ * @throws IllegalStateException if setOffsetRange has not been called
+ */
+ Offset getStartOffset();
+
+ /**
+ * Return the specified (if explicitly set through setOffsetRange) or
inferred end offset
+ * for this reader.
+ *
+ * @throws IllegalStateException if setOffsetRange has not been called
+ */
+ Offset getEndOffset();
+
+ /**
+ * Deserialize a JSON string into an Offset of the implementation-defined
offset type.
+ * @throws IllegalArgumentException if the JSON does not encode a valid
offset for this reader
+ */
+ Offset deserializeOffset(String json);
+
+ /**
+ * Informs the source that Spark has completed processing all data for
offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the
future.
+ */
+ void commit(Offset end);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
new file mode 100644
index 0000000..517fdab
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+/**
+ * An abstract representation of progress through a [[MicroBatchReader]] or
[[ContinuousReader]].
+ * During execution, Offsets provided by the data source implementation will
be logged and used as
+ * restart checkpoints. Sources should provide an Offset implementation which
they can use to
+ * reconstruct the stream position where the offset was taken.
+ */
+public abstract class Offset extends
org.apache.spark.sql.execution.streaming.Offset {
+ /**
+ * A JSON-serialized representation of an Offset that is
+ * used for saving offsets to the offset log.
+ * Note: We assume that equivalent/equal offsets serialize to
+ * identical JSON strings.
+ *
+ * @return JSON string encoding
+ */
+ public abstract String json();
+
+ /**
+ * Equality based on JSON string representation. We leverage the
+ * JSON representation for normalization between the Offset's
+ * in memory and on disk representations.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
+ return
this.json().equals(((org.apache.spark.sql.execution.streaming.Offset)
obj).json());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return this.json().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return this.json();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
new file mode 100644
index 0000000..729a612
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.reader;
+
+import java.io.Serializable;
+
+/**
+ * Used for per-partition offsets in continuous processing. ContinuousReader
implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+public interface PartitionOffset extends Serializable {
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
new file mode 100644
index 0000000..723395b
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/ContinuousWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.streaming.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceV2Writer} for use with continuous stream processing.
+ */
[email protected]
+public interface ContinuousWriter extends DataSourceV2Writer {
+ /**
+ * Commits this writing job for the specified epoch with a list of commit
messages. The commit
+ * messages are collected from successful data writers and are produced by
+ * {@link DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is
considered to have been
+ * failed, and the execution engine will attempt to call {@link
#abort(WriterCommitMessage[])}.
+ */
+ void commit(long epochId, WriterCommitMessage[] messages);
+
+ default void commit(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException(
+ "Commit without epoch should not be called with ContinuousWriter");
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
deleted file mode 100644
index 618f47e..0000000
---
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/ContinuousWriter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A {@link DataSourceV2Writer} for use with continuous stream processing.
- */
[email protected]
-public interface ContinuousWriter extends DataSourceV2Writer {
- /**
- * Commits this writing job for the specified epoch with a list of commit
messages. The commit
- * messages are collected from successful data writers and are produced by
- * {@link DataWriter#commit()}.
- *
- * If this method fails (by throwing an exception), this writing job is
considered to have been
- * failed, and the execution engine will attempt to call {@link
#abort(WriterCommitMessage[])}.
- */
- void commit(long epochId, WriterCommitMessage[] messages);
-
- default void commit(WriterCommitMessage[] messages) {
- throw new UnsupportedOperationException(
- "Commit without epoch should not be called with ContinuousWriter");
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index e4fca1b..49c506b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.StreamExecution
import
org.apache.spark.sql.execution.streaming.continuous.{ContinuousDataSourceRDD,
ContinuousExecution, EpochCoordinatorRef, SetReaderPartitions}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
import org.apache.spark.sql.types.StructType
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 1862da8..f0bdf84 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import
org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch,
ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 20f9810..9a7a13f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.sources.v2.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
index 3f85fa9..d02cf88 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
@@ -33,7 +33,8 @@ import
org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamR
import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader,
MicroBatchReader}
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
import org.apache.spark.sql.types._
import org.apache.spark.util.{ManualClock, SystemClock}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
index 65d6d18..261d69b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateStreamOffset.scala
@@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.sources.v2
case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int,
ValueRunTimeMsPair])
- extends v2.reader.Offset {
+ extends v2.streaming.reader.Offset {
implicit val defaultFormats: DefaultFormats = DefaultFormats
override val json = Serialization.write(partitionToValueAndRunTimeMs)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 0ca2e78..a9d50e3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2}
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
index 89fb2ac..d79e4bd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDDIter.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, Ro
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader,
PartitionOffset}
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{SystemClock, ThreadUtils}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 1c35b06..2843ab1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -29,9 +29,10 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation,
StreamingRelationV2, _}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
ContinuousWriteSupport, DataSourceV2Options}
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader, Offset,
PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport,
ContinuousWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader,
Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Clock, Utils}
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
index 89a8562..c9aa78a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
@@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateSourceProvider,
RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2,
DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader,
ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.{LongType, StructField, StructType,
TimestampType}
case class ContinuousRateStreamPartitionOffset(
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
index 7f1e8ab..98017c3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
@@ -26,8 +26,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv,
ThreadSafeRpcEndpoint}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
-import org.apache.spark.sql.sources.v2.reader.{ContinuousReader,
PartitionOffset}
-import org.apache.spark.sql.sources.v2.writer.{ContinuousWriter,
WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader,
PartitionOffset}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.util.RpcUtils
private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
index 1c66aed..97bada08 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset,
ValueRunTimeMsPair}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader,
Offset}
import org.apache.spark.sql.types.{LongType, StructField, StructType,
TimestampType}
import org.apache.spark.util.SystemClock
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 972248d..da7c31c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -29,7 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append,
Complete, Update}
import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2,
DataSourceV2Options, MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousWriteSupport,
MicroBatchWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f17935e..acd5ca1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame,
Dataset, SparkSession
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{StreamingRelation,
StreamingRelationV2}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index e808ffa..b508f44 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.ContinuousWriteSupport
+import org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport
import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 4868ba4..e6cdc06 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.test.SharedSQLContext
class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
index ceba27b..03d0f63 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.sources.v2.reader.Offset
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.util.ManualClock
http://git-wip-us.apache.org/repos/asf/spark/blob/32ec269d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
index dc833b2..e11705a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask,
RateStreamSourceV2, RateStreamV2Reader}
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
DataSourceV2Options, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.DataSourceV2Options
+import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
import org.apache.spark.sql.streaming.StreamTest
class RateSourceV2Suite extends StreamTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]