Repository: spark Updated Branches: refs/heads/master 3ff83ad43 -> fe73cb4b4
[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset ## What changes were proposed in this pull request? In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #20486 from cloud-fan/rename. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe73cb4b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe73cb4b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe73cb4b Branch: refs/heads/master Commit: fe73cb4b439169f16cc24cd851a11fd398ce7edf Parents: 3ff83ad Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Feb 2 20:49:08 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Feb 2 20:49:08 2018 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/kafka010/KafkaContinuousReader.scala | 2 +- .../spark/sql/sources/v2/reader/streaming/ContinuousReader.java | 4 ++-- .../sql/execution/streaming/continuous/ContinuousExecution.scala | 2 +- .../streaming/continuous/ContinuousRateStreamSource.scala | 2 +- .../apache/spark/sql/execution/streaming/RateSourceV2Suite.scala | 2 +- .../spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 41c443b..b049a05 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -71,7 +71,7 @@ class KafkaContinuousReader( override def readSchema: StructType = KafkaOffsetReader.kafkaSchema private var offset: Offset = _ - override def setOffset(start: ju.Optional[Offset]): Unit = { + override def setStartOffset(start: ju.Optional[Offset]): Unit = { offset = start.orElse { val offsets = initialOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java index d1d1e7f..7fe7f00 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java @@ -51,12 +51,12 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader * start from the first record after the provided offset, or from an implementation-defined * inferred starting point if no offset is provided. */ - void setOffset(Optional<Offset> start); + void setStartOffset(Optional<Offset> start); /** * Return the specified or inferred start offset for this reader. * - * @throws IllegalStateException if setOffset has not been called + * @throws IllegalStateException if setStartOffset has not been called */ Offset getStartOffset(); http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 08c8141..ed22b91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -181,7 +181,7 @@ class ContinuousExecution( val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json)) - reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull)) + reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull)) new StreamingDataSourceV2Relation(newOutput, reader) } http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 0eaaa48..b63d8d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -61,7 +61,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) private var offset: Offset = _ - override def setOffset(offset: java.util.Optional[Offset]): Unit = { + override def setStartOffset(offset: java.util.Optional[Offset]): Unit = { this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime)) } http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala index 3158995..0d68d9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala @@ -160,7 +160,7 @@ class RateSourceV2Suite extends StreamTest { test("continuous data") { val reader = new RateStreamContinuousReader( new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava)) - reader.setOffset(Optional.empty()) + reader.setStartOffset(Optional.empty()) val tasks = reader.createDataReaderFactories() assert(tasks.size == 2) http://git-wip-us.apache.org/repos/asf/spark/blob/fe73cb4b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index cb873ab..51f44fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -43,7 +43,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader { def readSchema(): StructType = StructType(Seq()) def stop(): Unit = {} def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map()) - def setOffset(start: Optional[Offset]): Unit = {} + def setStartOffset(start: Optional[Offset]): Unit = {} def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = { throw new IllegalStateException("fake source - cannot actually read") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org