This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 69dcea2 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming
module
69dcea2 is described below
commit 69dcea284961668b28d702e90e9068a3b80cbc8a
Author: beliefer <[email protected]>
AuthorDate: Tue Mar 10 18:04:09 2020 +0900
[SPARK-30992][DSTREAMS] Arrange scattered config of streaming module
### What changes were proposed in this pull request?
I found a lot scattered config in `Streaming`.I think should arrange these
config in unified position.
### Why are the changes needed?
Arrange scattered config
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Exists UT
Closes #27744 from beliefer/arrange-scattered-streaming-config.
Authored-by: beliefer <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 8ee41f3576689f3d164131d1e6041bd347394364)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../org/apache/spark/streaming/StreamingConf.scala | 161 +++++++++++++++++++++
.../apache/spark/streaming/StreamingContext.scala | 3 +-
.../apache/spark/streaming/dstream/DStream.scala | 3 +-
.../spark/streaming/receiver/BlockGenerator.scala | 5 +-
.../spark/streaming/receiver/RateLimiter.scala | 5 +-
.../spark/streaming/scheduler/JobGenerator.scala | 6 +-
.../spark/streaming/scheduler/JobScheduler.scala | 2 +-
.../spark/streaming/scheduler/RateController.scala | 3 +-
.../streaming/scheduler/rate/RateEstimator.scala | 11 +-
.../ui/StreamingJobProgressListener.scala | 4 +-
.../org/apache/spark/streaming/util/StateMap.scala | 4 +-
.../spark/streaming/util/WriteAheadLogUtils.scala | 42 ++----
.../streaming/ReceiverInputDStreamSuite.scala | 3 +-
13 files changed, 201 insertions(+), 51 deletions(-)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
new file mode 100644
index 0000000..71aefd6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.internal.config.ConfigBuilder
+import
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.DELTA_CHAIN_LENGTH_THRESHOLD
+
+object StreamingConf {
+
+ private[streaming] val BACKPRESSURE_ENABLED =
+ ConfigBuilder("spark.streaming.backpressure.enabled")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[streaming] val RECEIVER_MAX_RATE =
+ ConfigBuilder("spark.streaming.receiver.maxRate")
+ .longConf
+ .createWithDefault(Long.MaxValue)
+
+ private[streaming] val BACKPRESSURE_INITIAL_RATE =
+ ConfigBuilder("spark.streaming.backpressure.initialRate")
+ .fallbackConf(RECEIVER_MAX_RATE)
+
+ private[streaming] val BLOCK_INTERVAL =
+ ConfigBuilder("spark.streaming.blockInterval")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("200ms")
+
+ private[streaming] val RECEIVER_WAL_ENABLE_CONF_KEY =
+ ConfigBuilder("spark.streaming.receiver.writeAheadLog.enable")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[streaming] val RECEIVER_WAL_CLASS_CONF_KEY =
+ ConfigBuilder("spark.streaming.receiver.writeAheadLog.class")
+ .stringConf
+ .createOptional
+
+ private[streaming] val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ ConfigBuilder("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs")
+ .intConf
+ .createWithDefault(60)
+
+ private[streaming] val RECEIVER_WAL_MAX_FAILURES_CONF_KEY =
+ ConfigBuilder("spark.streaming.receiver.writeAheadLog.maxFailures")
+ .intConf
+ .createWithDefault(3)
+
+ private[streaming] val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+ ConfigBuilder("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[streaming] val DRIVER_WAL_CLASS_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.class")
+ .stringConf
+ .createOptional
+
+ private[streaming] val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.rollingIntervalSecs")
+ .intConf
+ .createWithDefault(60)
+
+ private[streaming] val DRIVER_WAL_MAX_FAILURES_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.maxFailures")
+ .intConf
+ .createWithDefault(3)
+
+ private[streaming] val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.closeFileAfterWrite")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[streaming] val DRIVER_WAL_BATCHING_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.allowBatching")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[streaming] val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY =
+ ConfigBuilder("spark.streaming.driver.writeAheadLog.batchingTimeout")
+ .longConf
+ .createWithDefault(5000)
+
+ private[streaming] val STREAMING_UNPERSIST =
+ ConfigBuilder("spark.streaming.unpersist")
+ .booleanConf
+ .createWithDefault(true)
+
+ private[streaming] val STOP_GRACEFULLY_ON_SHUTDOWN =
+ ConfigBuilder("spark.streaming.stopGracefullyOnShutdown")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[streaming] val UI_RETAINED_BATCHES =
+ ConfigBuilder("spark.streaming.ui.retainedBatches")
+ .intConf
+ .createWithDefault(1000)
+
+ private[streaming] val SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD =
+ ConfigBuilder("spark.streaming.sessionByKey.deltaChainThreshold")
+ .intConf
+ .createWithDefault(DELTA_CHAIN_LENGTH_THRESHOLD)
+
+ private[streaming] val BACKPRESSURE_RATE_ESTIMATOR =
+ ConfigBuilder("spark.streaming.backpressure.rateEstimator")
+ .stringConf
+ .createWithDefault("pid")
+
+ private[streaming] val BACKPRESSURE_PID_PROPORTIONAL =
+ ConfigBuilder("spark.streaming.backpressure.pid.proportional")
+ .doubleConf
+ .createWithDefault(1.0)
+
+ private[streaming] val BACKPRESSURE_PID_INTEGRAL =
+ ConfigBuilder("spark.streaming.backpressure.pid.integral")
+ .doubleConf
+ .createWithDefault(0.2)
+
+ private[streaming] val BACKPRESSURE_PID_DERIVED =
+ ConfigBuilder("spark.streaming.backpressure.pid.derived")
+ .doubleConf
+ .createWithDefault(0.0)
+
+ private[streaming] val BACKPRESSURE_PID_MIN_RATE =
+ ConfigBuilder("spark.streaming.backpressure.pid.minRate")
+ .doubleConf
+ .createWithDefault(100)
+
+ private[streaming] val CONCURRENT_JOBS =
+ ConfigBuilder("spark.streaming.concurrentJobs")
+ .intConf
+ .createWithDefault(1)
+
+ private[streaming] val GRACEFUL_STOP_TIMEOUT =
+ ConfigBuilder("spark.streaming.gracefulStopTimeout")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
+ private[streaming] val MANUAL_CLOCK_JUMP =
+ ConfigBuilder("spark.streaming.manualClock.jump")
+ .longConf
+ .createWithDefault(0)
+
+}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 440b653..e3459c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -41,6 +41,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingConf.STOP_GRACEFULLY_ON_SHUTDOWN
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
@@ -717,7 +718,7 @@ class StreamingContext private[streaming] (
}
private def stopOnShutdown(): Unit = {
- val stopGracefully =
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
+ val stopGracefully = conf.get(STOP_GRACEFULLY_ON_SHUTDOWN)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 6c981b2..e037f26 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.{BlockRDD, RDD, RDDOperationScope}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingConf.STREAMING_UNPERSIST
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
@@ -447,7 +448,7 @@ abstract class DStream[T: ClassTag] (
* this to clear their own metadata along with the generated RDDs.
*/
private[streaming] def clearMetadata(time: Time): Unit = {
- val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
+ val unpersistData = ssc.conf.get(STREAMING_UNPERSIST)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 2533c53..d641f55 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StreamBlockId
+import org.apache.spark.streaming.StreamingConf.BLOCK_INTERVAL
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, SystemClock}
@@ -100,8 +101,8 @@ private[streaming] class BlockGenerator(
}
import GeneratorState._
- private val blockIntervalMs =
conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
- require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a
positive value")
+ private val blockIntervalMs = conf.get(BLOCK_INTERVAL)
+ require(blockIntervalMs > 0, s"'${BLOCK_INTERVAL.key}' should be a positive
value")
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer,
"BlockGenerator")
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index c620074..f77ca3e 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.{RateLimiter =>
GuavaRateLimiter}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.StreamingConf.{BACKPRESSURE_INITIAL_RATE,
RECEIVER_MAX_RATE}
/**
* Provides waitToPush() method to limit the rate at which receivers consume
data.
@@ -37,7 +38,7 @@ import org.apache.spark.internal.Logging
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
// treated as an upper limit
- private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate",
Long.MaxValue)
+ private val maxRateLimit = conf.get(RECEIVER_MAX_RATE)
private lazy val rateLimiter =
GuavaRateLimiter.create(getInitialRateLimit().toDouble)
def waitToPush(): Unit = {
@@ -68,6 +69,6 @@ private[receiver] abstract class RateLimiter(conf: SparkConf)
extends Logging {
* Get the initial rateLimit to initial rateLimiter
*/
private def getInitialRateLimit(): Long = {
- math.min(conf.getLong("spark.streaming.backpressure.initialRate",
maxRateLimit), maxRateLimit)
+ math.min(conf.get(BACKPRESSURE_INITIAL_RATE), maxRateLimit)
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 7e8449e..8008a5c 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
+import org.apache.spark.streaming.{Checkpoint, CheckpointWriter,
StreamingConf, Time}
import org.apache.spark.streaming.api.python.PythonDStream
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
@@ -115,7 +115,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends
Logging {
logInfo("Stopping JobGenerator gracefully")
val timeWhenStopStarted = System.nanoTime()
val stopTimeoutMs = conf.getTimeAsMs(
- "spark.streaming.gracefulStopTimeout", s"${10 *
ssc.graph.batchDuration.milliseconds}ms")
+ StreamingConf.GRACEFUL_STOP_TIMEOUT.key, s"${10 *
ssc.graph.batchDuration.milliseconds}ms")
val pollTime = 100
// To prevent graceful stop to get stuck permanently
@@ -206,7 +206,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends
Logging {
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
- val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
+ val jumpTime = ssc.sc.conf.get(StreamingConf.MANUAL_CLOCK_JUMP)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 7eea57c..a6d8dcc 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -47,7 +47,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging
{
// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due
to Java 7/8 diff
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new
ConcurrentHashMap[Time, JobSet]
- private val numConcurrentJobs =
ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
+ private val numConcurrentJobs = ssc.conf.get(StreamingConf.CONCURRENT_JOBS)
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs,
"streaming-job-executor")
private[streaming] val jobGenerator = new JobGenerator(this)
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
index 7774e85..88f191f 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.{ExecutionContext, Future}
import org.apache.spark.SparkConf
+import org.apache.spark.streaming.StreamingConf.BACKPRESSURE_ENABLED
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.{ThreadUtils, Utils}
@@ -86,5 +87,5 @@ private[streaming] abstract class RateController(val
streamUID: Int, rateEstimat
object RateController {
def isBackPressureEnabled(conf: SparkConf): Boolean =
- conf.getBoolean("spark.streaming.backpressure.enabled", false)
+ conf.get(BACKPRESSURE_ENABLED)
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index e4b9dff..7f4d0f2 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingConf._
/**
* A component that estimates the rate at which an `InputDStream` should ingest
@@ -57,12 +58,12 @@ object RateEstimator {
* @throws IllegalArgumentException if the configured RateEstimator is not
`pid`.
*/
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
- conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
+ conf.get(BACKPRESSURE_RATE_ESTIMATOR) match {
case "pid" =>
- val proportional =
conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
- val integral =
conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
- val derived =
conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
- val minRate =
conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
+ val proportional = conf.get(BACKPRESSURE_PID_PROPORTIONAL)
+ val integral = conf.get(BACKPRESSURE_PID_INTEGRAL)
+ val derived = conf.get(BACKPRESSURE_PID_DERIVED)
+ val minRate = conf.get(BACKPRESSURE_PID_MIN_RATE)
new PIDRateEstimator(batchInterval.milliseconds, proportional,
integral, derived, minRate)
case estimator =>
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index de73762..da351ec 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, Queue}
import org.apache.spark.scheduler._
-import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.{StreamingConf, StreamingContext, Time}
import org.apache.spark.streaming.scheduler._
private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -33,7 +33,7 @@ private[spark] class StreamingJobProgressListener(ssc:
StreamingContext)
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
- private val batchUIDataLimit =
ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
+ private val batchUIDataLimit =
ssc.conf.get(StreamingConf.UI_RETAINED_BATCHES)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
index 618c036..4224cef 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/StateMap.scala
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{KryoInputObjectInputBridge,
KryoOutputObjectOutputBridge}
+import
org.apache.spark.streaming.StreamingConf.SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD
import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._
import org.apache.spark.util.collection.OpenHashMap
@@ -61,8 +62,7 @@ private[streaming] object StateMap {
def empty[K, S]: StateMap[K, S] = new EmptyStateMap[K, S]
def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
- val deltaChainThreshold =
conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
- DELTA_CHAIN_LENGTH_THRESHOLD)
+ val deltaChainThreshold = conf.get(SESSION_BY_KEY_DELTA_CHAIN_THRESHOLD)
new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
}
}
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index b0a4c98..224e782 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -23,52 +23,34 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.StreamingConf._
import org.apache.spark.util.Utils
/** A helper class with utility functions related to the WriteAheadLog
interface */
private[streaming] object WriteAheadLogUtils extends Logging {
- val RECEIVER_WAL_ENABLE_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.enable"
- val RECEIVER_WAL_CLASS_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.class"
- val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
- "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
- val RECEIVER_WAL_MAX_FAILURES_CONF_KEY =
"spark.streaming.receiver.writeAheadLog.maxFailures"
- val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
- "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"
-
- val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
- val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
- "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
- val DRIVER_WAL_MAX_FAILURES_CONF_KEY =
"spark.streaming.driver.writeAheadLog.maxFailures"
- val DRIVER_WAL_BATCHING_CONF_KEY =
"spark.streaming.driver.writeAheadLog.allowBatching"
- val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY =
"spark.streaming.driver.writeAheadLog.batchingTimeout"
- val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
- "spark.streaming.driver.writeAheadLog.closeFileAfterWrite"
-
- val DEFAULT_ROLLING_INTERVAL_SECS = 60
- val DEFAULT_MAX_FAILURES = 3
def enableReceiverLog(conf: SparkConf): Boolean = {
- conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
+ conf.get(RECEIVER_WAL_ENABLE_CONF_KEY)
}
def getRollingIntervalSecs(conf: SparkConf, isDriver: Boolean): Int = {
if (isDriver) {
- conf.getInt(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY,
DEFAULT_ROLLING_INTERVAL_SECS)
+ conf.get(DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY)
} else {
- conf.getInt(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY,
DEFAULT_ROLLING_INTERVAL_SECS)
+ conf.get(RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY)
}
}
def getMaxFailures(conf: SparkConf, isDriver: Boolean): Int = {
if (isDriver) {
- conf.getInt(DRIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ conf.get(DRIVER_WAL_MAX_FAILURES_CONF_KEY)
} else {
- conf.getInt(RECEIVER_WAL_MAX_FAILURES_CONF_KEY, DEFAULT_MAX_FAILURES)
+ conf.get(RECEIVER_WAL_MAX_FAILURES_CONF_KEY)
}
}
def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
- isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue =
true)
+ isDriver && conf.get(DRIVER_WAL_BATCHING_CONF_KEY)
}
/**
@@ -76,14 +58,14 @@ private[streaming] object WriteAheadLogUtils extends
Logging {
* before we fail the write attempt to unblock receivers.
*/
def getBatchingTimeout(conf: SparkConf): Long = {
- conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
+ conf.get(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY)
}
def shouldCloseFileAfterWrite(conf: SparkConf, isDriver: Boolean): Boolean =
{
if (isDriver) {
- conf.getBoolean(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue =
false)
+ conf.get(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY)
} else {
- conf.getBoolean(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue =
false)
+ conf.get(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY)
}
}
@@ -126,9 +108,9 @@ private[streaming] object WriteAheadLogUtils extends
Logging {
): WriteAheadLog = {
val classNameOption = if (isDriver) {
- sparkConf.getOption(DRIVER_WAL_CLASS_CONF_KEY)
+ sparkConf.get(DRIVER_WAL_CLASS_CONF_KEY)
} else {
- sparkConf.getOption(RECEIVER_WAL_CLASS_CONF_KEY)
+ sparkConf.get(RECEIVER_WAL_CLASS_CONF_KEY)
}
val wal = classNameOption.map { className =>
try {
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
index 5e2ce25..6b33220 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverInputDStreamSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult,
Receiver, WriteAheadLogBasedStoreResult}
@@ -117,7 +118,7 @@ class ReceiverInputDStreamSuite
private def runTest(enableWAL: Boolean, body: ReceiverInputDStream[_] =>
Unit): Unit = {
val conf = new SparkConf()
conf.setMaster("local[4]").setAppName("ReceiverInputDStreamSuite")
- conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY,
enableWAL.toString)
+ conf.set(StreamingConf.RECEIVER_WAL_ENABLE_CONF_KEY.key,
enableWAL.toString)
require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL)
ssc = new StreamingContext(conf, Seconds(1))
val receiverStream = new ReceiverInputDStream[Int](ssc) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]