Repository: spark Updated Branches: refs/heads/master 273b28404 -> c32dbd6bd
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise. ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #21955 from srowen/SPARK-18057.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c32dbd6b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c32dbd6b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c32dbd6b Branch: refs/heads/master Commit: c32dbd6bd55cdff4d73408ba5fd6fe18056048fe Parents: 273b284 Author: Sean Owen <[email protected]> Authored: Fri Aug 3 08:17:18 2018 -0500 Committer: Sean Owen <[email protected]> Committed: Fri Aug 3 08:17:18 2018 -0500 ---------------------------------------------------------------------- external/kafka-0-10-sql/pom.xml | 10 +----- external/kafka-0-10/pom.xml | 26 ++++++++++------ .../streaming/kafka010/KafkaRDDSuite.scala | 32 ++++++++++++-------- .../streaming/kafka010/KafkaTestUtils.scala | 12 ++++---- .../kafka010/mocks/MockScheduler.scala | 3 +- .../streaming/kafka010/mocks/MockTime.scala | 10 +++--- 6 files changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10-sql/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 9550003..8588e8b 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,6 +29,7 @@ <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <properties> <sbt.project.name>sql-kafka-0-10</sbt.project.name> + <!-- note that this should be compatible with Kafka brokers version 0.10 and up --> <kafka.version>2.0.0</kafka.version> </properties> <packaging>jar</packaging> @@ -128,13 +129,4 @@ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> </build> - <profiles> - <profile> - <id>scala-2.12</id> - <properties> - <kafka.version>0.10.1.1</kafka.version> - </properties> - </profile> - </profiles> - </project> http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 3b124b2..a97fd35 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -28,7 +28,8 @@ <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <properties> <sbt.project.name>streaming-kafka-0-10</sbt.project.name> - <kafka.version>0.10.0.1</kafka.version> + <!-- note that this should be compatible with Kafka brokers version 0.10 and up --> + <kafka.version>2.0.0</kafka.version> </properties> <packaging>jar</packaging> <name>Spark Integration for Kafka 0.10</name> @@ -58,6 +59,20 @@ <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>net.sf.jopt-simple</groupId> @@ -93,13 +108,4 @@ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> </build> - <profiles> - <profile> - <id>scala-2.12</id> - <properties> - <kafka.version>0.10.1.1</kafka.version> - </properties> - </profile> - </profiles> - </project> http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 271adea..3ac6509 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -23,11 +23,11 @@ import java.io.File import scala.collection.JavaConverters._ import scala.util.Random -import kafka.common.TopicAndPartition -import kafka.log._ -import kafka.message._ +import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, ProducerStateManager} +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.Pool import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.BeforeAndAfterAll @@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() - // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api - val logs = new Pool[TopicAndPartition, Log]() + val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) + val logDirFailureChannel = new LogDirFailureChannel(1) + val topicPartition = new TopicPartition(topic, partition) val log = new Log( dir, LogConfig(logProps), 0L, + 0L, mockTime.scheduler, - mockTime + new BrokerTopicStats(), + mockTime, + Int.MaxValue, + Int.MaxValue, + topicPartition, + new ProducerStateManager(topicPartition, dir), + logDirFailureChannel ) messages.foreach { case (k, v) => - val msg = new ByteBufferMessageSet( - NoCompressionCodec, - new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) - log.append(msg) + val record = new SimpleRecord(k.getBytes, v.getBytes) + log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, record), 0); } log.roll() - logs.put(TopicAndPartition(topic, partition), log) + logs.put(topicPartition, log) - val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs) + val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel) cleaner.startup() - cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000) + cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000) cleaner.shutdown() mockTime.scheduler.shutdown() http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 70b579d..2315baf 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) (server, brokerPort) }, new SparkConf(), "KafkaBroker") @@ -222,6 +222,8 @@ private[kafka010] class KafkaTestUtils extends Logging { props.put("zookeeper.connect", zkAddress) props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") + props.put("offsets.topic.replication.factor", "1") + props.put("group.initial.rebalance.delay.ms", "10") props } @@ -270,12 +272,10 @@ private[kafka010] class KafkaTestUtils extends Logging { private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { case Some(partitionState) => - val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - + val leader = partitionState.basePartitionState.leader + val isr = partitionState.basePartitionState.isr zkUtils.getLeaderForPartition(topic, partition).isDefined && - Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.nonEmpty - + Request.isValidBrokerId(leader) && !isr.isEmpty case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index 928e1a6..4811d04 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.PriorityQueue -import kafka.utils.{Scheduler, Time} +import kafka.utils.Scheduler +import org.apache.kafka.common.utils.Time /** * A mock scheduler that executes tasks synchronously using a mock time instance. http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala index a68f94d..8a8646e 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010.mocks import java.util.concurrent._ -import kafka.utils.Time +import org.apache.kafka.common.utils.Time /** * A class used for unit testing things which depend on the Time interface. @@ -36,12 +36,14 @@ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends def this() = this(System.currentTimeMillis) - def milliseconds: Long = currentMs + override def milliseconds: Long = currentMs - def nanoseconds: Long = + override def hiResClockMs(): Long = milliseconds + + override def nanoseconds: Long = TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) - def sleep(ms: Long) { + override def sleep(ms: Long) { this.currentMs += ms scheduler.tick() } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
