This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d1bd21a2a219 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0
d1bd21a2a219 is described below
commit d1bd21a2a219ebe6c5ac3fcb1e17db75af3c670c
Author: dengziming <[email protected]>
AuthorDate: Thu Oct 12 08:47:25 2023 -0700
[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0
### What changes were proposed in this pull request?
Upgrade Apache Kafka from 3.4.1 to 3.6.0
### Why are the changes needed?
- https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html
- https://downloads.apache.org/kafka/3.5.1/RELEASE_NOTES.html
- https://archive.apache.org/dist/kafka/3.5.0/RELEASE_NOTES.html
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GitHub CI.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43348 from dengziming/kafka-3.6.0.
Authored-by: dengziming <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++--
.../spark/streaming/kafka010/KafkaRDDSuite.scala | 16 ++++++++------
.../spark/streaming/kafka010/KafkaTestUtils.scala | 4 ++--
.../streaming/kafka010/mocks/MockScheduler.scala | 25 +++++++++++-----------
pom.xml | 2 +-
5 files changed, 26 insertions(+), 25 deletions(-)
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c54afc6290b1..2b0c13ed443d 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -28,7 +28,6 @@ import scala.io.Source
import scala.jdk.CollectionConverters._
import com.google.common.io.Files
-import kafka.api.Request
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.zk.KafkaZkClient
@@ -40,6 +39,7 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT,
SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
@@ -597,7 +597,7 @@ class KafkaTestUtils(
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkClient.getLeaderForPartition(new TopicPartition(topic,
partition)).isDefined &&
- Request.isValidBrokerId(partitionState.leader) &&
+ FetchRequest.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty
case _ =>
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 735ec2f7b448..ae941b1fddd5 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -24,12 +24,14 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random
-import kafka.log.{CleanerConfig, LogCleaner, LogConfig,
ProducerStateManagerConfig, UnifiedLog}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.log.{LogCleaner, UnifiedLog}
+import kafka.server.BrokerTopicStats
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
SimpleRecord}
import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark._
@@ -90,13 +92,13 @@ class KafkaRDDSuite extends SparkFunSuite {
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- logProps.put(LogConfig.MinCleanableDirtyRatioProp,
java.lang.Float.valueOf(0.1f))
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
+ logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG,
java.lang.Float.valueOf(0.1f))
val logDirFailureChannel = new LogDirFailureChannel(1)
val topicPartition = new TopicPartition(topic, partition)
val producerIdExpirationMs = Int.MaxValue
- val producerStateManagerConfig = new
ProducerStateManagerConfig(producerIdExpirationMs)
- val logConfig = LogConfig(logProps)
+ val producerStateManagerConfig = new
ProducerStateManagerConfig(producerIdExpirationMs, false)
+ val logConfig = new LogConfig(logProps)
val log = UnifiedLog(
dir,
logConfig,
@@ -120,7 +122,7 @@ class KafkaRDDSuite extends SparkFunSuite {
log.roll()
logs.put(topicPartition, log)
- val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs,
logDirFailureChannel)
+ val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs,
logDirFailureChannel)
cleaner.startup()
cleaner.awaitCleaned(new TopicPartition(topic, partition),
log.activeSegment.baseOffset, 1000)
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 6a9ef52e990e..1bd9b8bc3160 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -27,12 +27,12 @@ import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
-import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{Time => KTime}
import org.apache.zookeeper.client.ZKClientConfig
@@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
val leader = partitionState.leader
val isr = partitionState.isr
zkClient.getLeaderForPartition(new TopicPartition(topic,
partition)).isDefined &&
- Request.isValidBrokerId(leader) && !isr.isEmpty
+ FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
}
diff --git
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
index c0724909bc35..1b7e92a03604 100644
---
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
+++
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.util.Scheduler
import org.jmock.lib.concurrent.DeterministicScheduler
/**
@@ -42,8 +42,6 @@ private[kafka010] class MockScheduler(val time: Time) extends
Scheduler {
val scheduler = new DeterministicScheduler()
- def isStarted: Boolean = true
-
def startup(): Unit = {}
def shutdown(): Unit = synchronized {
@@ -56,17 +54,18 @@ private[kafka010] class MockScheduler(val time: Time)
extends Scheduler {
def schedule(
name: String,
- fun: () => Unit,
- delay: Long = 0,
- period: Long = -1,
- unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] =
synchronized {
- val runnable = new Runnable {
- override def run(): Unit = fun()
- }
- if (period >= 0) {
- scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
+ task: Runnable,
+ delayMs: Long = 0,
+ periodMs: Long = -1): ScheduledFuture[_] = synchronized {
+ if (periodMs >= 0) {
+ scheduler.scheduleAtFixedRate(task, delayMs, periodMs,
TimeUnit.MILLISECONDS)
} else {
- scheduler.schedule(runnable, delay, unit)
+ scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
}
}
+
+ override def resizeThreadPool(i: Int): Unit = {
+
+ }
+
}
diff --git a/pom.xml b/pom.xml
index e3a19257c8c1..d959d0dc0ff0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10
and up -->
- <kafka.version>3.4.1</kafka.version>
+ <kafka.version>3.6.0</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.13.1</parquet.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]