This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 69dcea2 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module 69dcea2 is described below commit 69dcea284961668b28d702e90e9068a3b80cbc8a Author: beliefer <belie...@163.com> AuthorDate: Tue Mar 10 18:04:09 2020 +0900 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module ### What changes were proposed in this pull request? I found a lot scattered config in `Streaming`.I think should arrange these config in unified position. ### Why are the changes needed? Arrange scattered config ### Does this PR introduce any user-facing change? No ### How was this patch tested? Exists UT Closes #27744 from beliefer/arrange-scattered-streaming-config. Authored-by: beliefer <belie...@163.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit 8ee41f3576689f3d164131d1e6041bd347394364) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../org/apache/spark/streaming/StreamingConf.scala | 161 +++++++++++++++++++++ .../apache/spark/streaming/StreamingContext.scala | 3 +- .../apache/spark/streaming/dstream/DStream.scala | 3 +- .../spark/streaming/receiver/BlockGenerator.scala | 5 +- .../spark/streaming/receiver/RateLimiter.scala | 5 +- .../spark/streaming/scheduler/JobGenerator.scala | 6 +- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- .../spark/streaming/scheduler/RateController.scala | 3 +- .../streaming/scheduler/rate/RateEstimator.scala | 11 +- .../ui/StreamingJobProgressListener.scala | 4 +- .../org/apache/spark/streaming/util/StateMap.scala | 4 +- .../spark/streaming/util/WriteAheadLogUtils.scala | 42 ++---- .../streaming/ReceiverInputDStreamSuite.scala | 3 +- 13 files changed, 201 insertions(+), 51 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala new file mode 100644 index 0000000..71aefd6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.streaming.util.OpenHashMapBasedStateMap.DELTA_CHAIN_LENGTH_THRESHOLD + +object StreamingConf { + + private[streaming] val BACKPRESSURE_ENABLED = + ConfigBuilder("spark.streaming.backpressure.enabled") + .booleanConf + .createWithDefault(false) + + private[streaming] val RECEIVER_MAX_RATE = + ConfigBuilder("spark.streaming.receiver.maxRate") + .longConf + .createWithDefault(Long.MaxValue) + + private[streaming] val BACKPRESSURE_INITIAL_RATE = + ConfigBuilder("spark.streaming.backpressure.initialRate") + .fallbackConf(RECEIVER_MAX_RATE) + + private[streaming] val BLOCK_INTERVAL = + ConfigBuilder("spark.streaming.blockInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("200ms") + + private[streaming] val RECEIVER_WAL_ENABLE_CONF_KEY = + ConfigBuilder("spark.streaming.receiver.writeAheadLog.enable") + .booleanConf + .createWithDefault(false) + + private[streaming] val RECEIVER_WAL_CLASS_CONF_KEY = + ConfigBuilder("spark.streaming.receiver.writeAheadLog.class") + .stringConf + .createOptional + + private[streaming] val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = + ConfigBuilder("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs") + .intConf + .createWithDefault(60) + + private[streaming] val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = + ConfigBuilder("spark.streaming.receiver.writeAheadLog.maxFailures") + .intConf + .createWithDefault(3) + + private[streaming] val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = + ConfigBuilder("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite") + .booleanConf + .createWithDefault(false) + + private[streaming] val DRIVER_WAL_CLASS_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.class") + .stringConf + .createOptional + + private[streaming] val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.rollingIntervalSecs") + .intConf + .createWithDefault(60) + + private[streaming] val DRIVER_WAL_MAX_FAILURES_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.maxFailures") + .intConf + .createWithDefault(3) + + private[streaming] val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.closeFileAfterWrite") + .booleanConf + .createWithDefault(false) + + private[streaming] val DRIVER_WAL_BATCHING_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.allowBatching") + .booleanConf + .createWithDefault(true) + + private[streaming] val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = + ConfigBuilder("spark.streaming.driver.writeAheadLog.batchingTimeout") + .longConf + .createWithDefault(5000) + + private[streaming] val STREAMING_UNPERSIST = + ConfigBuilder("spark.streaming.unpersist") + .booleanConf + .createWithDefault(true) + + private[streaming] val STOP_GRACEFULLY_ON_SHUTDOWN = + ConfigBuilder("spark.streaming.stopGracefullyOnShutdown") + .booleanConf + .createWithDefault(false) + + private[streaming] val UI_RETAINED_BATCHES = + ConfigBuilder("spark.streaming.ui.retainedBatches") + .intConf + .createWithDefault(1000) + + private[streaming] val SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD = + ConfigBuilder("spark.streaming.sessionByKey.deltaChainThreshold") + .intConf + .createWithDefault(DELTA_CHAIN_LENGTH_THRESHOLD) + + private[streaming] val BACKPRESSURE_RATE_ESTIMATOR = + ConfigBuilder("spark.streaming.backpressure.rateEstimator") + .stringConf + .createWithDefault("pid") + + private[streaming] val BACKPRESSURE_PID_PROPORTIONAL = + ConfigBuilder("spark.streaming.backpressure.pid.proportional") + .doubleConf + .createWithDefault(1.0) + + private[streaming] val BACKPRESSURE_PID_INTEGRAL = + ConfigBuilder("spark.streaming.backpressure.pid.integral") + .doubleConf + .createWithDefault(0.2) + + private[streaming] val BACKPRESSURE_PID_DERIVED = + ConfigBuilder("spark.streaming.backpressure.pid.derived") + .doubleConf + .createWithDefault(0.0) + + private[streaming] val BACKPRESSURE_PID_MIN_RATE = + ConfigBuilder("spark.streaming.backpressure.pid.minRate") + .doubleConf + .createWithDefault(100) + + private[streaming] val CONCURRENT_JOBS = + ConfigBuilder("spark.streaming.concurrentJobs") + .intConf + .createWithDefault(1) + + private[streaming] val GRACEFUL_STOP_TIMEOUT = + ConfigBuilder("spark.streaming.gracefulStopTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[streaming] val MANUAL_CLOCK_JUMP = + ConfigBuilder("spark.streaming.manualClock.jump") + .longConf + .createWithDefault(0) + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 440b653..e3459c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -41,6 +41,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope} import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingConf.STOP_GRACEFULLY_ON_SHUTDOWN import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver @@ -717,7 +718,7 @@ class StreamingContext private[streaming] ( } private def stopOnShutdown(): Unit = { - val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) + val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN) logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 6c981b2..e037f26 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingConf.STREAMING_UNPERSIST import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job import org.apache.spark.ui.{UIUtils => SparkUIUtils} @@ -447,7 +448,7 @@ abstract class DStream[T: ClassTag] ( * this to clear their own metadata along with the generated RDDs. */ private[streaming] def clearMetadata(time: Time): Unit = { - val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true) + val unpersistData = ssc.conf.get(STREAMING_UNPERSIST) val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) logDebug("Clearing references to old RDDs: [" + oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 2533c53..d641f55 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.StreamingConf.BLOCK_INTERVAL import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, SystemClock} @@ -100,8 +101,8 @@ private[streaming] class BlockGenerator( } import GeneratorState._ - private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") - require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value") + private val blockIntervalMs = conf.get(BLOCK_INTERVAL) + require(blockIntervalMs > 0, s"'${BLOCK_INTERVAL.key}' should be a positive value") private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index c620074..f77ca3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.streaming.StreamingConf.{BACKPRESSURE_INITIAL_RATE, RECEIVER_MAX_RATE} /** * Provides waitToPush() method to limit the rate at which receivers consume data. @@ -37,7 +38,7 @@ import org.apache.spark.internal.Logging private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit - private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + private val maxRateLimit = conf.get(RECEIVER_MAX_RATE) private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush(): Unit = { @@ -68,6 +69,6 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { * Get the initial rateLimit to initial rateLimiter */ private def getInitialRateLimit(): Long = { - math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) + math.min(conf.get(BACKPRESSURE_INITIAL_RATE), maxRateLimit) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 7e8449e..8008a5c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} +import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, StreamingConf, Time} import org.apache.spark.streaming.api.python.PythonDStream import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} @@ -115,7 +115,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Stopping JobGenerator gracefully") val timeWhenStopStarted = System.nanoTime() val stopTimeoutMs = conf.getTimeAsMs( - "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms") + StreamingConf.GRACEFUL_STOP_TIMEOUT.key, s"${10 * ssc.graph.batchDuration.milliseconds}ms") val pollTime = 100 // To prevent graceful stop to get stuck permanently @@ -206,7 +206,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) + val jumpTime = ssc.sc.conf.get(StreamingConf.MANUAL_CLOCK_JUMP) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 7eea57c..a6d8dcc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -47,7 +47,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] - private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) + private val numConcurrentJobs = ssc.conf.get(StreamingConf.CONCURRENT_JOBS) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") private[streaming] val jobGenerator = new JobGenerator(this) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala index 7774e85..88f191f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.SparkConf +import org.apache.spark.streaming.StreamingConf.BACKPRESSURE_ENABLED import org.apache.spark.streaming.scheduler.rate.RateEstimator import org.apache.spark.util.{ThreadUtils, Utils} @@ -86,5 +87,5 @@ private[streaming] abstract class RateController(val streamUID: Int, rateEstimat object RateController { def isBackPressureEnabled(conf: SparkConf): Boolean = - conf.getBoolean("spark.streaming.backpressure.enabled", false) + conf.get(BACKPRESSURE_ENABLED) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index e4b9dff..7f4d0f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler.rate import org.apache.spark.SparkConf import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.StreamingConf._ /** * A component that estimates the rate at which an `InputDStream` should ingest @@ -57,12 +58,12 @@ object RateEstimator { * @throws IllegalArgumentException if the configured RateEstimator is not `pid`. */ def create(conf: SparkConf, batchInterval: Duration): RateEstimator = - conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { + conf.get(BACKPRESSURE_RATE_ESTIMATOR) match { case "pid" => - val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) - val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) - val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) - val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100) + val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL) + val integral = conf.get(BACKPRESSURE_PID_INTEGRAL) + val derived = conf.get(BACKPRESSURE_PID_DERIVED) + val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE) new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate) case estimator => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index de73762..da351ec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, Queue} import org.apache.spark.scheduler._ -import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.{StreamingConf, StreamingContext, Time} import org.apache.spark.streaming.scheduler._ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) @@ -33,7 +33,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) private val waitingBatchUIData = new HashMap[Time, BatchUIData] private val runningBatchUIData = new HashMap[Time, BatchUIData] private val completedBatchUIData = new Queue[BatchUIData] - private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000) + private val batchUIDataLimit = ssc.conf.get(StreamingConf.UI_RETAINED_BATCHES) private var totalCompletedBatches = 0L private var totalReceivedRecords = 0L private var totalProcessedRecords = 0L diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala index 618c036..4224cef 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala @@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoInputObjectInputBridge, KryoOutputObjectOutputBridge} +import org.apache.spark.streaming.StreamingConf.SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._ import org.apache.spark.util.collection.OpenHashMap @@ -61,8 +62,7 @@ private[streaming] object StateMap { def empty[K, S]: StateMap[K, S] = new EmptyStateMap[K, S] def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = { - val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold", - DELTA_CHAIN_LENGTH_THRESHOLD) + val deltaChainThreshold = conf.get(SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD) new OpenHashMapBasedStateMap[K, S](deltaChainThreshold) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index b0a4c98..224e782 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -23,52 +23,34 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.streaming.StreamingConf._ import org.apache.spark.util.Utils /** A helper class with utility functions related to the WriteAheadLog interface */ private[streaming] object WriteAheadLogUtils extends Logging { - val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable" - val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class" - val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = - "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs" - val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures" - val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = - "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite" - - val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class" - val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = - "spark.streaming.driver.writeAheadLog.rollingIntervalSecs" - val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures" - val DRIVER_WAL_BATCHING_CONF_KEY = "spark.streaming.driver.writeAheadLog.allowBatching" - val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = "spark.streaming.driver.writeAheadLog.batchingTimeout" - val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = - "spark.streaming.driver.writeAheadLog.closeFileAfterWrite" - - val DEFAULT_ROLLING_INTERVAL_SECS = 60 - val DEFAULT_MAX_FAILURES = 3 def enableReceiverLog(conf: SparkConf): Boolean = { - conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false) + conf.get(RECEIVER_WAL_ENABLE_CONF_KEY) } def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = { if (isDriver) { - conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS) + conf.get(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY) } else { - conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY, DEFAULT_ROLLING_INTERVAL_SECS) + conf.get(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY) } } def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = { if (isDriver) { - conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES) + conf.get(DRIVER_WAL_MAX_FAILURES_CONF_KEY) } else { - conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES) + conf.get(RECEIVER_WAL_MAX_FAILURES_CONF_KEY) } } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { - isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) + isDriver && conf.get(DRIVER_WAL_BATCHING_CONF_KEY) } /** @@ -76,14 +58,14 @@ private[streaming] object WriteAheadLogUtils extends Logging { * before we fail the write attempt to unblock receivers. */ def getBatchingTimeout(conf: SparkConf): Long = { - conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000) + conf.get(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY) } def shouldCloseFileAfterWrite(conf: SparkConf, isDriver: Boolean): Boolean = { if (isDriver) { - conf.getBoolean(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = false) + conf.get(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY) } else { - conf.getBoolean(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = false) + conf.get(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY) } } @@ -126,9 +108,9 @@ private[streaming] object WriteAheadLogUtils extends Logging { ): WriteAheadLog = { val classNameOption = if (isDriver) { - sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY) + sparkConf.get(DRIVER_WAL_CLASS_CONF_KEY) } else { - sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY) + sparkConf.get(RECEIVER_WAL_CLASS_CONF_KEY) } val wal = classNameOption.map { className => try { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala index 5e2ce25..6b33220 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{StorageLevel, StreamBlockId} +import org.apache.spark.streaming.StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, Receiver, WriteAheadLogBasedStoreResult} @@ -117,7 +118,7 @@ class ReceiverInputDStreamSuite private def runTest(enableWAL: Boolean, body: ReceiverInputDStream[_] => Unit): Unit = { val conf = new SparkConf() conf.setMaster("local[4]").setAppName("ReceiverInputDStreamSuite") - conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY, enableWAL.toString) + conf.set(StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY.key, enableWAL.toString) require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL) ssc = new StreamingContext(conf, Seconds(1)) val receiverStream = new ReceiverInputDStream[Int](ssc) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org