This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bae5baa [SPARK-27642][SS] make v1 offset extends v2 offset bae5baa is described below commit bae5baae5281d01dc8c67077b90592be857329bd Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue May 7 23:03:15 2019 -0700 [SPARK-27642][SS] make v1 offset extends v2 offset ## What changes were proposed in this pull request? To move DS v2 to the catalyst module, we can't make v2 offset rely on v1 offset, as v1 offset is in sql/core. ## How was this patch tested? existing tests Closes #24538 from cloud-fan/offset. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../spark/sql/kafka010/KafkaContinuousStream.scala | 2 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 4 +-- .../spark/sql/execution/streaming/Offset.java | 42 +++------------------- .../sql/sources/v2/reader/streaming/Offset.java | 11 ++---- .../spark/sql/execution/streaming/LongOffset.scala | 14 +------- .../execution/streaming/MicroBatchExecution.scala | 10 +++--- .../spark/sql/execution/streaming/OffsetSeq.scala | 9 ++--- .../sql/execution/streaming/OffsetSeqLog.scala | 3 +- .../sql/execution/streaming/StreamExecution.scala | 4 +-- .../sql/execution/streaming/StreamProgress.scala | 19 +++++----- .../spark/sql/execution/streaming/memory.scala | 25 +++++-------- .../sources/TextSocketMicroBatchStream.scala | 5 +-- .../apache/spark/sql/streaming/StreamTest.scala | 8 ++--- 13 files changed, 49 insertions(+), 107 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index d60ee1c..92686d2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -76,7 +76,7 @@ class KafkaContinuousStream( } override def planInputPartitions(start: Offset): Array[InputPartition] = { - val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start) + val oldStartPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index 8d41c0d..90d7043 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -20,14 +20,14 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset} +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and * their offsets. */ private[kafka010] -case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 { +case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { override val json = JsonUtils.partitionOffsets(partitionToOffsets) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java index 43ad4b3..7c167dc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/streaming/Offset.java @@ -18,44 +18,10 @@ package org.apache.spark.sql.execution.streaming; /** - * This is an internal, deprecated interface. New source implementations should use the - * org.apache.spark.sql.sources.v2.reader.streaming.Offset class, which is the one that will be - * supported in the long term. + * This class is an alias of {@link org.apache.spark.sql.sources.v2.reader.streaming.Offset}. It's + * internal and deprecated. New streaming data source implementations should use data source v2 API, + * which will be supported in the long term. * * This class will be removed in a future release. */ -public abstract class Offset { - /** - * A JSON-serialized representation of an Offset that is - * used for saving offsets to the offset log. - * Note: We assume that equivalent/equal offsets serialize to - * identical JSON strings. - * - * @return JSON string encoding - */ - public abstract String json(); - - /** - * Equality based on JSON string representation. We leverage the - * JSON representation for normalization between the Offset's - * in memory and on disk representations. - */ - @Override - public boolean equals(Object obj) { - if (obj instanceof Offset) { - return this.json().equals(((Offset) obj).json()); - } else { - return false; - } - } - - @Override - public int hashCode() { - return this.json().hashCode(); - } - - @Override - public String toString() { - return this.json(); - } -} +public abstract class Offset extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java index a066713..1d34fdd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java @@ -25,13 +25,9 @@ import org.apache.spark.annotation.Evolving; * During execution, offsets provided by the data source implementation will be logged and used as * restart checkpoints. Each source should provide an offset implementation which the source can use * to reconstruct a position in the stream up to which data has been seen/processed. - * - * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to - * maintain compatibility with DataSource V1 APIs. This extension will be removed once we - * get rid of V1 completely. */ @Evolving -public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset { +public abstract class Offset { /** * A JSON-serialized representation of an Offset that is * used for saving offsets to the offset log. @@ -49,9 +45,8 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of */ @Override public boolean equals(Object obj) { - if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) { - return this.json() - .equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json()); + if (obj instanceof Offset) { + return this.json().equals(((Offset) obj).json()); } else { return false; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 3ff5b86..a27898c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} - /** * A simple offset for sources that produce a single linear stream of data. */ -case class LongOffset(offset: Long) extends OffsetV2 { +case class LongOffset(offset: Long) extends Offset { override val json = offset.toString @@ -37,14 +35,4 @@ object LongOffset { * @return new LongOffset */ def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong) - - /** - * Convert generic Offset to LongOffset if possible. - * @return converted LongOffset - */ - def convert(offset: Offset): Option[LongOffset] = offset match { - case lo: LongOffset => Some(lo) - case so: SerializedOffset => Some(LongOffset(so)) - case _ => None - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 58c265d..7a3cdbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -296,7 +296,7 @@ class MicroBatchExecution( * batch will be executed before getOffset is called again. */ availableOffsets.foreach { case (source: Source, end: Offset) => - val start = committedOffsets.get(source) + val start = committedOffsets.get(source).map(_.asInstanceOf[Offset]) source.getBatch(start, end) case nonV1Tuple => // The V2 API does not have the same edge case requiring getBatch to be called @@ -354,7 +354,7 @@ class MicroBatchExecution( if (isCurrentBatchConstructed) return true // Generate a map from each unique source to the next available offset. - val latestOffsets: Map[SparkDataStream, Option[Offset]] = uniqueSources.map { + val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map { case s: Source => updateStatusMessage(s"Getting offsets from $s") reportTimeTaken("getOffset") { @@ -411,7 +411,7 @@ class MicroBatchExecution( val prevBatchOff = offsetLog.get(currentBatchId - 1) if (prevBatchOff.isDefined) { prevBatchOff.get.toStreamProgress(sources).foreach { - case (src: Source, off) => src.commit(off) + case (src: Source, off: Offset) => src.commit(off) case (stream: MicroBatchStream, off) => stream.commit(stream.deserializeOffset(off.json)) case (src, _) => @@ -448,9 +448,9 @@ class MicroBatchExecution( // Request unprocessed data from all sources. newData = reportTimeTaken("getBatch") { availableOffsets.flatMap { - case (source: Source, available) + case (source: Source, available: Offset) if committedOffsets.get(source).map(_ != available).getOrElse(true) => - val current = committedOffsets.get(source) + val current = committedOffsets.get(source).map(_.asInstanceOf[Offset]) val batch = source.getBatch(current, available) assert(batch.isStreaming, s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 0f7ad75..b6fa2e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -24,14 +24,15 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.RuntimeConfig import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager} import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _} -import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream} + /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. */ -case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) { +case class OffsetSeq(offsets: Seq[Option[OffsetV2]], metadata: Option[OffsetSeqMetadata] = None) { /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the ordered list of @@ -57,13 +58,13 @@ object OffsetSeq { * Returns a [[OffsetSeq]] with a variable sequence of offsets. * `nulls` in the sequence are converted to `None`s. */ - def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*) + def fill(offsets: OffsetV2*): OffsetSeq = OffsetSeq.fill(None, offsets: _*) /** * Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets. * `nulls` in the sequence are converted to `None`s. */ - def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = { + def fill(metadata: Option[String], offsets: OffsetV2*): OffsetSeq = { OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index 2c8d7c7..8a05dad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} /** * This class is used to log offsets to persistent files in HDFS. @@ -47,7 +48,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) override protected def deserialize(in: InputStream): OffsetSeq = { // called inside a try-finally where the underlying stream is closed in the caller - def parseOffset(value: String): Offset = value match { + def parseOffset(value: String): OffsetV2 = value match { case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null case json => SerializedOffset(json) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5d66b61..4c08b3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.{SupportsWrite, Table} -import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.sources.v2.writer.SupportsTruncate import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ @@ -438,7 +438,7 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset, timeoutMs: Long): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: OffsetV2, timeoutMs: Long): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 8a1d064..8783eaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -19,15 +19,16 @@ package org.apache.spark.sql.execution.streaming import scala.collection.{immutable, GenTraversableOnce} -import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream} + /** * A helper class that looks like a Map[Source, Offset]. */ class StreamProgress( - val baseMap: immutable.Map[SparkDataStream, Offset] = - new immutable.HashMap[SparkDataStream, Offset]) - extends scala.collection.immutable.Map[SparkDataStream, Offset] { + val baseMap: immutable.Map[SparkDataStream, OffsetV2] = + new immutable.HashMap[SparkDataStream, OffsetV2]) + extends scala.collection.immutable.Map[SparkDataStream, OffsetV2] { def toOffsetSeq(source: Seq[SparkDataStream], metadata: OffsetSeqMetadata): OffsetSeq = { OffsetSeq(source.map(get), Some(metadata)) @@ -36,17 +37,17 @@ class StreamProgress( override def toString: String = baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") - override def +[B1 >: Offset](kv: (SparkDataStream, B1)): Map[SparkDataStream, B1] = { + override def +[B1 >: OffsetV2](kv: (SparkDataStream, B1)): Map[SparkDataStream, B1] = { baseMap + kv } - override def get(key: SparkDataStream): Option[Offset] = baseMap.get(key) + override def get(key: SparkDataStream): Option[OffsetV2] = baseMap.get(key) - override def iterator: Iterator[(SparkDataStream, Offset)] = baseMap.iterator + override def iterator: Iterator[(SparkDataStream, OffsetV2)] = baseMap.iterator - override def -(key: SparkDataStream): Map[SparkDataStream, Offset] = baseMap - key + override def -(key: SparkDataStream): Map[SparkDataStream, OffsetV2] = baseMap - key - def ++(updates: GenTraversableOnce[(SparkDataStream, Offset)]): StreamProgress = { + def ++(updates: GenTraversableOnce[(SparkDataStream, OffsetV2)]): StreamProgress = { new StreamProgress(baseMap ++ updates) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 022c8da..df14955 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -61,10 +61,12 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa Dataset.ofRows(sqlContext.sparkSession, logicalPlan) } - def addData(data: A*): Offset = { + def addData(data: A*): OffsetV2 = { addData(data.toTraversable) } + def addData(data: TraversableOnce[A]): OffsetV2 + def fullSchema(): StructType = encoder.schema protected val logicalPlan: LogicalPlan = { @@ -77,8 +79,6 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa None)(sqlContext.sparkSession) } - def addData(data: TraversableOnce[A]): Offset - override def initialOffset(): OffsetV2 = { throw new IllegalStateException("should not be called.") } @@ -226,22 +226,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } override def commit(end: OffsetV2): Unit = synchronized { - def check(newOffset: LongOffset): Unit = { - val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + val newOffset = end.asInstanceOf[LongOffset] + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt - if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") - } - - batches.trimStart(offsetDiff) - lastOffsetCommitted = newOffset + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") } - LongOffset.convert(end) match { - case Some(lo) => check(lo) - case None => sys.error(s"MemoryStream.commit() received an offset ($end) " + - "that did not originate with an instance of this class") - } + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset } override def stop() {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index dab64e1..25e9af2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -150,10 +150,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int) } override def commit(end: Offset): Unit = synchronized { - val newOffset = LongOffset.convert(end).getOrElse( - sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + - s"originate with an instance of this class") - ) + val newOffset = end.asInstanceOf[LongOffset] val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 89c62ba..3a4414f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.execution.streaming.state.StateStore -import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream} import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -124,7 +124,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be * the active query, and then return the source object the data was added, as well as the * offset of added data. */ - def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) + def addData(query: Option[StreamExecution]): (SparkDataStream, OffsetV2) } /** A trait that can be extended when testing a source. */ @@ -135,7 +135,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be case class AddDataMemory[A](source: MemoryStreamBase[A], data: Seq[A]) extends AddData { override def toString: String = s"AddData to $source: ${data.mkString(",")}" - override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = { + override def addData(query: Option[StreamExecution]): (SparkDataStream, OffsetV2) = { (source, source.addData(data)) } } @@ -337,7 +337,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be var pos = 0 var currentStream: StreamExecution = null var lastStream: StreamExecution = null - val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for + val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> offset to wait for val sink = new MemorySink val resetConfValues = mutable.Map[String, Option[String]]() val defaultCheckpointLocation = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org