Repository: spark
Updated Branches:
  refs/heads/master 273b28404 -> c32dbd6bd


[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0

## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 
2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes #21955 from srowen/SPARK-18057.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c32dbd6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c32dbd6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c32dbd6b

Branch: refs/heads/master
Commit: c32dbd6bd55cdff4d73408ba5fd6fe18056048fe
Parents: 273b284
Author: Sean Owen <[email protected]>
Authored: Fri Aug 3 08:17:18 2018 -0500
Committer: Sean Owen <[email protected]>
Committed: Fri Aug 3 08:17:18 2018 -0500

----------------------------------------------------------------------
 external/kafka-0-10-sql/pom.xml                 | 10 +-----
 external/kafka-0-10/pom.xml                     | 26 ++++++++++------
 .../streaming/kafka010/KafkaRDDSuite.scala      | 32 ++++++++++++--------
 .../streaming/kafka010/KafkaTestUtils.scala     | 12 ++++----
 .../kafka010/mocks/MockScheduler.scala          |  3 +-
 .../streaming/kafka010/mocks/MockTime.scala     | 10 +++---
 6 files changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10-sql/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 9550003..8588e8b 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,6 +29,7 @@
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <properties>
     <sbt.project.name>sql-kafka-0-10</sbt.project.name>
+    <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
     <kafka.version>2.0.0</kafka.version>
   </properties>
   <packaging>jar</packaging>
@@ -128,13 +129,4 @@
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
   </build>
 
-  <profiles>
-    <profile>
-      <id>scala-2.12</id>
-      <properties>
-        <kafka.version>0.10.1.1</kafka.version>
-      </properties>
-    </profile>
-  </profiles>
-
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 3b124b2..a97fd35 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -28,7 +28,8 @@
   <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
   <properties>
     <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
-    <kafka.version>0.10.0.1</kafka.version>
+    <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
+    <kafka.version>2.0.0</kafka.version>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Integration for Kafka 0.10</name>
@@ -58,6 +59,20 @@
       <artifactId>kafka_${scala.binary.version}</artifactId>
       <version>${kafka.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>net.sf.jopt-simple</groupId>
@@ -93,13 +108,4 @@
     
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
   </build>
 
-  <profiles>
-    <profile>
-      <id>scala-2.12</id>
-      <properties>
-        <kafka.version>0.10.1.1</kafka.version>
-      </properties>
-    </profile>
-  </profiles>
-
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 271adea..3ac6509 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -23,11 +23,11 @@ import java.io.File
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import kafka.common.TopicAndPartition
-import kafka.log._
-import kafka.message._
+import kafka.log.{CleanerConfig, Log, LogCleaner, LogConfig, 
ProducerStateManager}
+import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.scalatest.BeforeAndAfterAll
 
@@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
     val mockTime = new MockTime()
-    // LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
-    val logs = new Pool[TopicAndPartition, Log]()
+    val logs = new Pool[TopicPartition, Log]()
     val logDir = kafkaTestUtils.brokerLogDir
     val dir = new File(logDir, topic + "-" + partition)
     dir.mkdirs()
     val logProps = new ju.Properties()
     logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+    val logDirFailureChannel = new LogDirFailureChannel(1)
+    val topicPartition = new TopicPartition(topic, partition)
     val log = new Log(
       dir,
       LogConfig(logProps),
       0L,
+      0L,
       mockTime.scheduler,
-      mockTime
+      new BrokerTopicStats(),
+      mockTime,
+      Int.MaxValue,
+      Int.MaxValue,
+      topicPartition,
+      new ProducerStateManager(topicPartition, dir),
+      logDirFailureChannel
     )
     messages.foreach { case (k, v) =>
-      val msg = new ByteBufferMessageSet(
-        NoCompressionCodec,
-        new Message(v.getBytes, k.getBytes, Message.NoTimestamp, 
Message.CurrentMagicValue))
-      log.append(msg)
+      val record = new SimpleRecord(k.getBytes, v.getBytes)
+      log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
record), 0);
     }
     log.roll()
-    logs.put(TopicAndPartition(topic, partition), log)
+    logs.put(topicPartition, log)
 
-    val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = 
logs)
+    val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, 
logDirFailureChannel)
     cleaner.startup()
-    cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000)
+    cleaner.awaitCleaned(new TopicPartition(topic, partition), 
log.activeSegment.baseOffset, 1000)
 
     cleaner.shutdown()
     mockTime.scheduler.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 70b579d..2315baf 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
       brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
       server = new KafkaServer(brokerConf)
       server.startup()
-      brokerPort = server.boundPort()
+      brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
       (server, brokerPort)
     }, new SparkConf(), "KafkaBroker")
 
@@ -222,6 +222,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
     props.put("zookeeper.connect", zkAddress)
     props.put("log.flush.interval.messages", "1")
     props.put("replica.socket.timeout.ms", "1500")
+    props.put("offsets.topic.replication.factor", "1")
+    props.put("group.initial.rebalance.delay.ms", "10")
     props
   }
 
@@ -270,12 +272,10 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
     def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
       case Some(partitionState) =>
-        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
+        val leader = partitionState.basePartitionState.leader
+        val isr = partitionState.basePartitionState.isr
         zkUtils.getLeaderForPartition(topic, partition).isDefined &&
-          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
-          leaderAndInSyncReplicas.isr.nonEmpty
-
+          Request.isValidBrokerId(leader) && !isr.isEmpty
       case _ =>
         false
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
index 928e1a6..4811d04 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.PriorityQueue
 
-import kafka.utils.{Scheduler, Time}
+import kafka.utils.Scheduler
+import org.apache.kafka.common.utils.Time
 
 /**
  * A mock scheduler that executes tasks synchronously using a mock time 
instance.

http://git-wip-us.apache.org/repos/asf/spark/blob/c32dbd6b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
index a68f94d..8a8646e 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010.mocks
 
 import java.util.concurrent._
 
-import kafka.utils.Time
+import org.apache.kafka.common.utils.Time
 
 /**
  * A class used for unit testing things which depend on the Time interface.
@@ -36,12 +36,14 @@ private[kafka010] class MockTime(@volatile private var 
currentMs: Long) extends
 
   def this() = this(System.currentTimeMillis)
 
-  def milliseconds: Long = currentMs
+  override def milliseconds: Long = currentMs
 
-  def nanoseconds: Long =
+  override def hiResClockMs(): Long = milliseconds
+
+  override def nanoseconds: Long =
     TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
 
-  def sleep(ms: Long) {
+  override def sleep(ms: Long) {
     this.currentMs += ms
     scheduler.tick()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to