This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5a8aad0 [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10 5a8aad0 is described below commit 5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d Author: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local> AuthorDate: Sun May 12 10:46:18 2019 -0500 [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10 ## What changes were proposed in this pull request? [SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343) Based on the previous PR: https://github.com/apache/spark/pull/24270 Extracting parameters , building the objects of ConfigEntry. For example: for the parameter "spark.kafka.producer.cache.timeout",we build ``` private[kafka010] val PRODUCER_CACHE_TIMEOUT = ConfigBuilder("spark.kafka.producer.cache.timeout") .doc("The expire time to remove the unused producers.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("10m") ``` Closes #24574 from hehuiyuan/hehuiyuan-patch-9. Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../spark/sql/kafka010/CachedKafkaProducer.scala | 7 +++--- .../spark/sql/kafka010/KafkaContinuousStream.scala | 3 ++- .../spark/sql/kafka010/KafkaDataConsumer.scala | 2 +- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 5 ++-- .../sql/kafka010/KafkaOffsetRangeCalculator.scala | 3 ++- .../spark/sql/kafka010/KafkaOffsetReader.scala | 4 ++-- .../apache/spark/sql/kafka010/KafkaRelation.scala | 2 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 4 ++-- .../spark/sql/kafka010/KafkaSourceProvider.scala | 27 ++++++++++++++-------- .../org/apache/spark/sql/kafka010/package.scala | 16 +++++++++++++ 10 files changed, 49 insertions(+), 24 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 2bab287..ce22e3f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -36,10 +36,9 @@ private[kafka010] object CachedKafkaProducer extends Logging { private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) - private lazy val cacheExpireTimeout: Long = - Option(SparkEnv.get).map(_.conf.getTimeAsMs( - "spark.kafka.producer.cache.timeout", - s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) + private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get) + .map(_.conf.get(PRODUCER_CACHE_TIMEOUT)) + .getOrElse(defaultCacheExpireTimeout) private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { override def load(config: Seq[(String, Object)]): Producer = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 92686d2..03f82a5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -53,7 +53,8 @@ class KafkaContinuousStream( failOnDataLoss: Boolean) extends ContinuousStream with Logging { - private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong + private val pollTimeoutMs = + sourceOptions.getOrElse(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, "512").toLong // Initialized when creating reader factories. If this diverges from the partitions at the latest // offsets, we need to reconfigure. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 45ea3d2..cbb99fd 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -523,7 +523,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { // tasks simultaneously using consumers than the capacity. private lazy val cache = { val conf = SparkEnv.get.conf - val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64) + val capacity = conf.get(CONSUMER_CACHE_CAPACITY) new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) { override def removeEldestEntry( entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 76c7b5d..32d5f92 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -64,10 +64,11 @@ private[kafka010] class KafkaMicroBatchStream( failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { private val pollTimeoutMs = options.getLong( - "kafkaConsumer.pollTimeoutMs", + KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) - private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong) + private val maxOffsetsPerTrigger = Option(options.get(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)) + .map(_.toLong) private val rangeCalculator = KafkaOffsetRangeCalculator(options) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 1af8404..c188b4c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -92,7 +92,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int private[kafka010] object KafkaOffsetRangeCalculator { def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = { - val optionalValue = Option(options.get("minPartitions")).map(_.toInt) + val optionalValue = Option(options.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) + .map(_.toInt) new KafkaOffsetRangeCalculator(optionalValue) } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index c64b070..429cbeb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -89,10 +89,10 @@ private[kafka010] class KafkaOffsetReader( } private val maxOffsetFetchAttempts = - readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt + readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt private val offsetFetchAttemptIntervalMs = - readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong + readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong private def nextGroupId(): String = { groupId = driverGroupIdPrefix + "-" + nextId diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index 48cc089..d1ff96a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -47,7 +47,7 @@ private[kafka010] class KafkaRelation( "Ending offset not allowed to be set to earliest offsets.") private val pollTimeoutMs = sourceOptions.getOrElse( - "kafkaConsumer.pollTimeoutMs", + KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index fa93e8f..037c01b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -85,12 +85,12 @@ private[kafka010] class KafkaSource( private val sc = sqlContext.sparkContext private val pollTimeoutMs = sourceOptions.getOrElse( - "kafkaConsumer.pollTimeoutMs", + KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString ).toLong private val maxOffsetsPerTrigger = - sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + sourceOptions.get(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER).map(_.toLong) /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index c27382d..dbd3310 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -185,11 +185,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private def strategy(caseInsensitiveParams: Map[String, String]) = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { - case ("assign", value) => + case (ASSIGN, value) => AssignStrategy(JsonUtils.partitions(value)) - case ("subscribe", value) => + case (SUBSCRIBE, value) => SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty)) - case ("subscribepattern", value) => + case (SUBSCRIBE_PATTERN, value) => SubscribePatternStrategy(value.trim()) case _ => // Should never reach here as we are already matching on @@ -217,22 +217,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { - case ("assign", value) => + case (ASSIGN, value) => if (!value.trim.startsWith("{")) { throw new IllegalArgumentException( "No topicpartitions to assign as specified value for option " + s"'assign' is '$value'") } - case ("subscribe", value) => + case (SUBSCRIBE, value) => val topics = value.split(",").map(_.trim).filter(_.nonEmpty) if (topics.isEmpty) { throw new IllegalArgumentException( "No topics to subscribe to as specified value for option " + s"'subscribe' is '$value'") } - case ("subscribepattern", value) => - val pattern = caseInsensitiveParams("subscribepattern").trim() + case (SUBSCRIBE_PATTERN, value) => + val pattern = caseInsensitiveParams(SUBSCRIBE_PATTERN).trim() if (pattern.isEmpty) { throw new IllegalArgumentException( "Pattern to subscribe is empty as specified value for option " + @@ -348,7 +348,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister validateGeneralOptions(caseInsensitiveParams) // Don't want to throw an error, but at least log a warning. - if (caseInsensitiveParams.get("maxoffsetspertrigger").isDefined) { + if (caseInsensitiveParams.get(MAX_OFFSET_PER_TRIGGER.toLowerCase(Locale.ROOT)).isDefined) { logWarning("maxOffsetsPerTrigger option ignored in batch queries") } } @@ -458,11 +458,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } private[kafka010] object KafkaSourceProvider extends Logging { - private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign") + private val ASSIGN = "assign" + private val SUBSCRIBE_PATTERN = "subscribepattern" + private val SUBSCRIBE = "subscribe" + private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN) private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" - private val MIN_PARTITIONS_OPTION_KEY = "minpartitions" + private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions" + private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxOffsetsPerTrigger" + private[kafka010] val FETCH_OFFSET_NUM_RETRY = "fetchOffset.numRetries" + private[kafka010] val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchOffset.retryIntervalMs" + private[kafka010] val CONSUMER_POLL_TIMEOUT = "kafkaConsumer.pollTimeoutMs" private val GROUP_ID_PREFIX = "groupidprefix" val TOPIC_OPTION_KEY = "topic" diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala index 43acd6a..115ec44 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala @@ -16,9 +16,25 @@ */ package org.apache.spark.sql +import java.util.concurrent.TimeUnit + import org.apache.kafka.common.TopicPartition +import org.apache.spark.internal.config.ConfigBuilder + package object kafka010 { // scalastyle:ignore // ^^ scalastyle:ignore is for ignoring warnings about digits in package name type PartitionOffsetMap = Map[TopicPartition, Long] + + private[kafka010] val PRODUCER_CACHE_TIMEOUT = + ConfigBuilder("spark.kafka.producer.cache.timeout") + .doc("The expire time to remove the unused producers.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10m") + + private[kafka010] val CONSUMER_CACHE_CAPACITY = + ConfigBuilder("spark.sql.kafkaConsumerCache.capacity") + .doc("The size of consumers cached.") + .intConf + .createWithDefault(64) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org