[kafka] branch trunk updated: KAFKA-6624; Prevent concurrent log flush and log deletion (#4663)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1ea07b9 KAFKA-6624; Prevent concurrent log flush and log deletion (#4663) 1ea07b9 is described below commit 1ea07b993d75ed68f4c04282eb177bf84156e0b2 Author: Dong Lin <lindon...@users.noreply.github.com> AuthorDate: Mon Mar 12 22:20:44 2018 -0700 KAFKA-6624; Prevent concurrent log flush and log deletion (#4663) KAFKA-6624; Prevent concurrent log flush and log deletion Reviewers: Ted Yu <yuzhih...@gmail.com>, Jun Rao <jun...@gmail.com> --- core/src/main/scala/kafka/log/LogManager.scala | 18 +- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 9ae93aa..7aa5bcd 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File], // from one log directory to another log directory on the same broker. The directory of the future log will be renamed // to replace the current log of the partition after the future log catches up with the current log private val futureLogs = new Pool[TopicPartition, Log]() - private val logsToBeDeleted = new LinkedBlockingQueue[Log]() + // Each element in the queue contains the log object to be deleted and the time it is scheduled for deletion. + private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]() private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs) @volatile var currentDefaultConfig = initialDefaultConfig @@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File], } } + private def addLogToBeDeleted(log: Log): Unit = { +this.logsToBeDeleted.add((log, time.milliseconds())) + } + private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long]): Unit = { debug("Loading log '" + logDir.getName + "'") val topicPartition = Log.parseTopicPartitionName(logDir) @@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File], logDirFailureChannel = logDirFailureChannel) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { - this.logsToBeDeleted.add(log) + addLogToBeDeleted(log) } else { val previous = { if (log.isFuture) @@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File], private def deleteLogs(): Unit = { try { while (!logsToBeDeleted.isEmpty) { -val removedLog = logsToBeDeleted.take() +val (removedLog, scheduleTimeMs) = logsToBeDeleted.take() if (removedLog != null) { try { +val waitingTimeMs = scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds() +if (waitingTimeMs > 0) + Thread.sleep(waitingTimeMs) removedLog.delete() info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") } catch { @@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File], sourceLog.close() checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) -logsToBeDeleted.add(sourceLog) +addLogToBeDeleted(sourceLog) } catch { case e: KafkaStorageException => // If sourceLog's log directory is offline, we need close its handlers here. @@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File], removedLog.renameDir(Log.logDeleteDirName(topicPartition)) checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) - logsToBeDeleted.add(removedLog) + addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } else if (offlineLogDirs.nonEmpty) { throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] branch 1.1 updated: MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 5778059 MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672) 5778059 is described below commit 5778059a91ff3144c228eb4c12f725c89e1e87a6 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri Mar 9 23:18:23 2018 + MINOR: Fix deadlock in ZooKeeperClient.close() on session expiry (#4672) Reviewers: Jun Rao <jun...@gmail.com> --- .../scala/kafka/zookeeper/ZooKeeperClient.scala| 37 ++ .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 22 + 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index efbd6e8..74a3a2d 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -301,13 +301,17 @@ class ZooKeeperClient(connectString: String, stateChangeHandlers.remove(name) } - def close(): Unit = inWriteLock(initializationLock) { + def close(): Unit = { info("Closing.") -zNodeChangeHandlers.clear() -zNodeChildChangeHandlers.clear() -stateChangeHandlers.clear() -zooKeeper.close() -metricNames.foreach(removeMetric(_)) +inWriteLock(initializationLock) { + zNodeChangeHandlers.clear() + zNodeChildChangeHandlers.clear() + stateChangeHandlers.clear() + zooKeeper.close() + metricNames.foreach(removeMetric(_)) +} +// Shutdown scheduler outside of lock to avoid deadlock if scheduler +// is waiting for lock to process session expiry expiryScheduler.shutdown() info("Closed.") } @@ -348,6 +352,18 @@ class ZooKeeperClient(connectString: String, initialize() } + // Visibility for testing + private[zookeeper] def scheduleSessionExpiryHandler(): Unit = { +expiryScheduler.schedule("zk-session-expired", () => { + inWriteLock(initializationLock) { +info("Session expired.") +stateChangeHandlers.values.foreach(_.beforeInitializingSession()) +initialize() +stateChangeHandlers.values.foreach(_.afterInitializingSession()) + } +}, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS) + } + // package level visibility for testing only private[zookeeper] object ZooKeeperClientWatcher extends Watcher { override def process(event: WatchedEvent): Unit = { @@ -363,14 +379,7 @@ class ZooKeeperClient(connectString: String, error("Auth failed.") stateChangeHandlers.values.foreach(_.onAuthFailure()) } else if (state == KeeperState.Expired) { -expiryScheduler.schedule("zk-session-expired", () => { - inWriteLock(initializationLock) { -info("Session expired.") - stateChangeHandlers.values.foreach(_.beforeInitializingSession()) -initialize() - stateChangeHandlers.values.foreach(_.afterInitializingSession()) - } -}, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS) +scheduleSessionExpiryHandler() } case Some(path) => (event.getType: @unchecked) match { diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index 2e0651c..77e11ea 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -486,6 +486,28 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted) } + @Test + def testSessionExpiryDuringClose(): Unit = { +val semaphore = new Semaphore(0) +val closeExecutor = Executors.newSingleThreadExecutor +try { + zooKeeperClient.expiryScheduler.schedule("test", () => semaphore.acquireUninterruptibly(), +delay = 0, period = -1, TimeUnit.SECONDS) + zooKeeperClient.scheduleSessionExpiryHandler() + val closeFuture = closeExecutor.submit(new Runnable { +override def run(): Unit = { + zooKeeperClient.close() +} + }) + assertFalse("Close completed without shutting down expiry scheduler gracefully", closeFuture.isDone) + semaphore.release() + closeFuture.get(10, TimeUnit.SECONDS) + assertFalse("Expiry executor not shutdown", zooKeeperClient.expiryScheduler.isStarted) +} finally { + closeExecutor.shutdownNow() +} + } + def isExpected
[kafka] branch trunk updated: KAFKA-6752: Enable unclean leader election metric (#4838)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e29fa9a KAFKA-6752: Enable unclean leader election metric (#4838) e29fa9a is described below commit e29fa9a4ca72e6eae0a4b75ef30f2df4ffec943d Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Apr 11 23:00:30 2018 +0530 KAFKA-6752: Enable unclean leader election metric (#4838) Reviewers: Jun Rao <jun...@gmail.com> --- .../kafka/controller/PartitionStateMachine.scala | 9 +--- .../PartitionLeaderElectionAlgorithmsTest.scala| 26 -- .../integration/UncleanLeaderElectionTest.scala| 14 ++-- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 74bc59f..6805e32 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig, if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr -val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled) +val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig, } object PartitionLeaderElectionAlgorithms { - def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = { + def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { -assignment.find(liveReplicas.contains) +val leaderOpt = assignment.find(liveReplicas.contains) +if (!leaderOpt.isEmpty) + controllerContext.stats.uncleanLeaderElectionRate.mark() +leaderOpt } else { None } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala index f149fc9..113a39d 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala @@ -17,10 +17,17 @@ package kafka.controller import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { + private var controllerContext: ControllerContext = null + + @Before + def setUp(): Unit = { +controllerContext = new ControllerContext +controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec") + } @Test def testOfflinePartitionLeaderElection(): Unit = { @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(Option(4), leaderOpt) } @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(None, leaderOpt) +assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count()) } + @Test def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = { val assignment = Seq(2, 4) @@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartiti
[kafka] branch 1.1 updated: KAFKA-6752: Enable unclean leader election metric (#4838)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 129859d KAFKA-6752: Enable unclean leader election metric (#4838) 129859d is described below commit 129859d0f7bf6cfe394706d6b69a18be510b32d8 Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Apr 11 23:00:30 2018 +0530 KAFKA-6752: Enable unclean leader election metric (#4838) Reviewers: Jun Rao <jun...@gmail.com> --- .../kafka/controller/PartitionStateMachine.scala | 9 +--- .../PartitionLeaderElectionAlgorithmsTest.scala| 26 -- .../integration/UncleanLeaderElectionTest.scala| 14 ++-- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 2e27272..d760061 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig, if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr -val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled) +val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig, } object PartitionLeaderElectionAlgorithms { - def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = { + def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { -assignment.find(liveReplicas.contains) +val leaderOpt = assignment.find(liveReplicas.contains) +if (!leaderOpt.isEmpty) + controllerContext.stats.uncleanLeaderElectionRate.mark() +leaderOpt } else { None } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala index f149fc9..113a39d 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala @@ -17,10 +17,17 @@ package kafka.controller import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { + private var controllerContext: ControllerContext = null + + @Before + def setUp(): Unit = { +controllerContext = new ControllerContext +controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec") + } @Test def testOfflinePartitionLeaderElection(): Unit = { @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(Option(4), leaderOpt) } @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(None, leaderOpt) +assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count()) } + @Test def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = { val assignment = Seq(2, 4) @@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assi
[kafka] branch trunk updated: KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 47918f2 KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427) 47918f2 is described below commit 47918f2d79e907f6a6da599ab82a97c16979 Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Apr 11 23:18:04 2018 +0530 KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) (#4427) Reviewers: Jun Rao <jun...@gmail.com> --- build.gradle | 1 + checkstyle/suppressions.xml| 2 +- .../apache/kafka/clients/admin/AdminClient.java| 154 + .../admin/CreateDelegationTokenOptions.java| 53 +++ .../clients/admin/CreateDelegationTokenResult.java | 43 ++ .../admin/DescribeDelegationTokenOptions.java | 48 +++ .../admin/DescribeDelegationTokenResult.java | 45 ++ .../admin/ExpireDelegationTokenOptions.java} | 26 ++-- .../admin/ExpireDelegationTokenResult.java}| 29 ++-- .../kafka/clients/admin/KafkaAdminClient.java | 137 ++ .../admin/RenewDelegationTokenOptions.java}| 26 ++-- .../admin/RenewDelegationTokenResult.java} | 29 ++-- .../kafka/common/network/ChannelBuilders.java | 2 +- .../kafka/common/network/SaslChannelBuilder.java | 2 +- .../requests/DescribeDelegationTokenResponse.java | 4 + .../requests/ExpireDelegationTokenRequest.java | 4 +- .../requests/ExpireDelegationTokenResponse.java| 4 + .../requests/RenewDelegationTokenRequest.java | 4 +- .../requests/RenewDelegationTokenResponse.java | 4 + .../security/scram/internal/ScramSaslServer.java | 2 +- .../scram/internal/ScramServerCallbackHandler.java | 4 +- .../security/token/delegation/DelegationToken.java | 11 +- .../token/delegation/TokenInformation.java | 6 + .../{ => internal}/DelegationTokenCache.java | 4 +- .../DelegationTokenCredentialCallback.java | 2 +- .../kafka/clients/admin/MockAdminClient.java | 20 +++ .../apache/kafka/common/network/NioEchoServer.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 4 +- .../scram/internal/ScramSaslServerTest.java| 2 +- core/src/main/scala/kafka/admin/AdminClient.scala | 29 .../scala/kafka/admin/DelegationTokenCommand.scala | 88 ++-- .../scala/kafka/security/CredentialProvider.scala | 2 +- .../kafka/server/DelegationTokenManager.scala | 3 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../DelegationTokenEndToEndAuthorizationTest.scala | 8 +- .../kafka/admin/DelegationTokenCommandTest.scala | 147 .../delegation/DelegationTokenManagerTest.scala| 3 +- .../DelegationTokenRequestsOnPlainTextTest.scala | 27 ++-- .../kafka/server/DelegationTokenRequestsTest.scala | 102 -- ...nTokenRequestsWithDisableTokenFeatureTest.scala | 32 ++--- .../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +- 41 files changed, 907 insertions(+), 214 deletions(-) diff --git a/build.gradle b/build.gradle index f836980..69f560e 100644 --- a/build.gradle +++ b/build.gradle @@ -858,6 +858,7 @@ project(':clients') { include "**/org/apache/kafka/common/config/*" include "**/org/apache/kafka/common/security/auth/*" include "**/org/apache/kafka/server/policy/*" +include "**/org/apache/kafka/common/security/token/delegation/*" } } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0fec810..2767132 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -10,7 +10,7 @@ + files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/> recordsToDelete, DeleteRecordsOptions options); + +/** + * Create a Delegation Token. + * + * This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options. + * See the overload for more details. + * + * @return The CreateDelegationTokenResult. + */ +public CreateDelegationTokenResult createDelegationToken() { +return createDelegationToken(new CreateDelegationTokenOptions()); +} + + +/** + * Create a Delegation Token. + * + * This operation is supported by brokers with version 1.1.0 or higher. + * + * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + *
[kafka] branch trunk updated: KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info (#4825)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 341db99 KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info (#4825) 341db99 is described below commit 341db990dc4e2acb207622c7fa254f07650742bc Author: gitlw <lucasatu...@gmail.com> AuthorDate: Mon Apr 16 17:16:08 2018 -0700 KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info (#4825) A partially deleted topic can end up with some partitions having no leadership info. For the partially deleted topic, a new controller should be able to finish the topic deletion by transitioning the rogue partition's replicas to OfflineReplica state. This patch adds logic to transition replicas to OfflineReplica state whose partitions have no leadership info. Added a new test method to cover the partially deleted topic case. Reviewers: Jun Rao <jun...@gmail.com> --- .../kafka/controller/ReplicaStateMachine.scala | 11 +- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 43 +++--- .../kafka/controller/ReplicaStateMachineTest.scala | 2 +- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index a2d04e6..5fafcc4 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig, controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false, (_, _) => ()) } -val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)) -val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition)) +val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica => + controllerContext.partitionLeadershipInfo.contains(replica.topicPartition) +} +val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => if (!topicDeletionManager.isPartitionToBeDeleted(partition)) { val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) @@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig, logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica) replicaState.put(replica, OfflineReplica) } + +replicasWithoutLeadershipInfo.foreach { replica => + logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), OfflineReplica) + replicaState.put(replica, OfflineReplica) +} case ReplicaDeletionStarted => validReplicas.foreach { replica => logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 9b58fc7..a65128a 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @return true if path gets deleted successfully, false if root path doesn't exist * @throws KeeperException if there is an error while deleting the znodes */ - private[zk] def deleteRecursive(path: String): Boolean = { + def deleteRecursive(path: String): Boolean = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) getChildrenResponse.resultCode match { case Code.OK => diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ef455d4..4c033c4 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -17,7 +17,7 @@ package kafka.admin import kafka.log.Log -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness} import kafka.utils.TestUtils import kafka.server.{KafkaConfig, KafkaServer} import org.junit.Assert._ @@ -3
[kafka] branch trunk updated: KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2ef6ee2 KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668) 2ef6ee2 is described below commit 2ef6ee2338178c7501f5bd4c7cce5f4cea9d3e17 Author: gitlw <lucasatu...@gmail.com> AuthorDate: Thu Mar 29 22:08:28 2018 -0700 KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668) Reviewed by Jun Rao <jun...@gmail.com> --- .../scala/kafka/controller/ControllerContext.scala | 90 .../scala/kafka/controller/KafkaController.scala | 117 ++--- .../kafka/controller/PartitionStateMachine.scala | 2 +- .../kafka/controller/ReplicaStateMachine.scala | 7 +- .../kafka/controller/TopicDeletionManager.scala| 3 +- .../controller/PartitionStateMachineTest.scala | 16 +-- .../kafka/controller/ReplicaStateMachineTest.scala | 12 +-- 7 files changed, 148 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 541bce8..f4671cf 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -31,14 +31,46 @@ class ControllerContext { var epoch: Int = KafkaController.InitialControllerEpoch - 1 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 var allTopics: Set[String] = Set.empty - var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty - var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty + private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty + val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty + def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = { +partitionReplicaAssignmentUnderlying.getOrElse(topicPartition.topic, mutable.Map.empty) + .getOrElse(topicPartition.partition, Seq.empty) + } + + private def clearTopicsState(): Unit = { +allTopics = Set.empty +partitionReplicaAssignmentUnderlying.clear() +partitionLeadershipInfo.clear() +partitionsBeingReassigned.clear() +replicasOnOfflineDirs.clear() + } + + def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { +partitionReplicaAssignmentUnderlying.getOrElseUpdate(topicPartition.topic, mutable.Map.empty) + .put(topicPartition.partition, newReplicas) + } + + def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = { +partitionReplicaAssignmentUnderlying.getOrElse(topic, Map.empty).map { + case (partition, replicas) => (new TopicPartition(topic, partition), replicas) +}.toMap + } + + def allPartitions: Set[TopicPartition] = { +partitionReplicaAssignmentUnderlying.flatMap { + case (topic, topicReplicaAssignment) => topicReplicaAssignment.map { +case (partition, _) => new TopicPartition(topic, partition) + } +}.toSet + } + // setter def liveBrokers_=(brokers: Set[Broker]) { liveBrokersUnderlying = brokers @@ -53,8 +85,12 @@ class ControllerContext { def liveOrShuttingDownBrokers = liveBrokersUnderlying def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = { -partitionReplicaAssignment.collect { - case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition +partitionReplicaAssignmentUnderlying.flatMap { + case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter { +case (_, replicas) => replicas.contains(brokerId) + }.map { +case (partition, _) => new TopicPartition(topic, partition) + } }.toSet } @@ -68,22 +104,26 @@ class ControllerContext { def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { brokerIds.flatMap { brokerId => - partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) => -PartitionAndReplica(topicPartition, brokerId) + partitionReplicaAssignmentUnderlying.flatMap { +case (topic, topicReplicaAssignment) => topicReplicaAssign
[kafka] 09/09: removed method updatedLeaderIsrAndControllerEpochs
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Thu Mar 1 14:47:47 2018 -0800 removed method updatedLeaderIsrAndControllerEpochs --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +++- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 9590ecd..e44c2c9 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -700,8 +700,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] = leaderIsrAndControllerEpochs(0, 0) - private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] = -leaderIsrAndControllerEpochs(state, state - 1) val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr) private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] = @@ -843,18 +841,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)), - zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map { + zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map { eraseMetadataAndStat}.toList) val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) assertEquals(2, getResponses.size) -topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)} +topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)} // Other ZK client can also write the state of a partition assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)), - otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map { + otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map { eraseMetadataAndStat}.toList) } -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] branch trunk updated (6cfcc9d -> 8100fd9)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 6cfcc9d KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625) new 0103103 Fix in updateBrokerInfoInZk, exception is thrown if response was not OK. new af598af Fix in deleteLogDirEventNotifications - use correct path for deleted children new d0eb552 Additional tests to improve test coverage of KafkaZkClient. new 54a3220 Use move otherZkClient to KafkaZkClientTest new 155ac59 Undo unintentional whitespace change in ZooKeeperTestHarness new ae51a15 Fixes requested by reviewer new 1ae4c1d Minor fixes requested by reviewer new cacd377 Changes requeted by reviewer new 8100fd9 removed method updatedLeaderIsrAndControllerEpochs The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 576 - 2 files changed, 555 insertions(+), 26 deletions(-) -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 02/09: Fix in deleteLogDirEventNotifications - use correct path for deleted children
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit af598af20828149bdd45336377e70d26360452fe Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Mon Feb 19 16:51:38 2018 +0100 Fix in deleteLogDirEventNotifications - use correct path for deleted children --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 145e294..851c686 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] branch trunk updated: KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5d87b92 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests; 5d87b92 is described below commit 5d87b926d50823004a96a97062e0866d92d07b41 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Thu Mar 1 18:12:52 2018 -0800 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests; --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 576 - 2 files changed, 555 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6545fde..d61b281 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) +response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c2..e44c2c9 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTe
[kafka] branch 1.1 updated: KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 873b28e KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests; 873b28e is described below commit 873b28ea95345510365959c94d0c5fbf9756ca86 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Thu Mar 1 18:12:52 2018 -0800 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests; --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 576 - 2 files changed, 555 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6f2e79e..a5b6e25 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) +response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c2..e44c2c9 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTe
[kafka] 04/09: Use move otherZkClient to KafkaZkClientTest
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 54a32205b0fa7545d450104bc28aacf153b70cf6 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Tue Feb 20 09:03:56 2018 +0100 Use move otherZkClient to KafkaZkClientTest --- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 22 -- .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++-- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 9329430..28dbb73 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -19,6 +19,7 @@ package kafka.zk import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.{CountDownLatch, TimeUnit} +import javax.security.auth.login.Configuration import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} @@ -30,10 +31,10 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils +import org.apache.kafka.common.utils.{SecurityUtils, Time} import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.{Seq, mutable} @@ -42,6 +43,7 @@ import scala.util.Random import kafka.controller.LeaderIsrAndControllerEpoch import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.data.Stat class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -55,6 +57,22 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val topicPartition20 = new TopicPartition(topic2, 0) val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + var otherZkClient: KafkaZkClient = null + + @Before + override def setUp() { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown() { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index f9cb8e3..af2d53a 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,16 +19,16 @@ package kafka.zk import javax.security.auth.login.Configuration -import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} +import kafka.utils.{CoreUtils, Logging, TestUtils} import org.junit.{After, AfterClass, Before, BeforeClass} import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.test.IntegrationTest import org.junit.experimental.categories.Category + import scala.collection.Set import scala.collection.JavaConverters._ - import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.consumer.internals.AbstractCoordinator import kafka.controller.ControllerEventManager @@ -45,33 +45,25 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { protected val zkAclsEnabled: Option[Boolean] = None var zkClient: KafkaZkClient = null - var otherZkClient: KafkaZkClient = null var adminZkClient: AdminZkClient = null var zookeeper: EmbeddedZookeeper = null def zkPort: Int = zookeeper.port def zkConnect: String = s"127.0.0.1:$zkPort" - + @Before def setUp() { zookeeper = new EmbeddedZookeeper() -zkClient = createZkClient -otherZkClient = createZkClient -adminZkClient = new AdminZkClient(zkClient) - } - - protected def createZkClient = { -KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, +zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) +adminZkClient = new AdminZkClient(zkClient) } @After def tearDown() { if (zkClient != n
[kafka] 03/09: Additional tests to improve test coverage of KafkaZkClient.
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit d0eb552a891ecd99150ddf4b89985525806c6e31 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Mon Feb 19 16:52:46 2018 +0100 Additional tests to improve test coverage of KafkaZkClient. --- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 558 - .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 18 +- 2 files changed, 547 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c2..9329430 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -30,16 +31,30 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ import org.junit.Test - import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,17 +105,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testTopicAssignmentMethods() { -val topic1 = "topic1" -val topic2 = "topic2" +assertTrue(zkClient.getAllTopicsInCluster.isEmpty) // test with non-existing topic +assertFalse(zkClient.topicExists(topic1)) assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty) assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty) assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty) assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty) +val topicPartition = new TopicPartition(topic1, 0) val assignment = Map( - new TopicPartition(topic1, 0) -> Seq(0, 1), + topicPartition -> Seq(0, 1), new TopicPartition(topic1, 1) -> Seq(0, 1), new TopicPartition(topic1, 2) -> Seq(1, 2, 3) ) @@ -108,6 +124,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { // create a topic assignment zkClient.createTopicAssignment(topic1, assignment) +assertTrue(zkClient.topicExists(topic1)) + val expectedAssignment = assignment map { topicAssignment => val partition = topicAssignment._1.partition val assignment = topicAssignment._2 @@ -215,6 +233,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test + def testIsrChangeNotificationGetters(): Unit = { +assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications) +assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("00"))) + +zkClient.createRecursive("/isr_change_notification") + +zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11)) +zkClient.propagateIsrChanges(Set(topicPartition10)) + +assertEquals(Set("00", "01"), zkClient.getAllIsrChangeNotifications.toSet) + +// A partition can have multiple notifications +assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10), + zkClient.getPartitionsFromIsrChangeNotifications(Seq("00", "01"))) + } + + @Test + def testIsrChangeNotificationsDeletion():
[kafka] 07/09: Minor fixes requested by reviewer
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 1ae4c1d6095a86266ba6d008711570df0eaf5682 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Tue Feb 20 21:21:44 2018 +0100 Minor fixes requested by reviewer --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 5cbb76e..d6826a6 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.deleteDelegationToken(tokenId)) assertEquals(None, zkClient.getDelegationTokenInfo(tokenId)) } -} \ No newline at end of file +} -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 01/09: Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 010310388725d6393a73e12c02dff4bb85cf2518 Author: Sandor Murakozi <smurak...@gmail.com> AuthorDate: Mon Feb 19 16:49:15 2018 +0100 Fix in updateBrokerInfoInZk, exception is thrown if response was not OK. --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6545fde..145e294 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -88,7 +88,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) + if (response.resultCode != Code.OK) + throw KeeperException.create(response.resultCode) info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 01/09: Revert "removed method updatedLeaderIsrAndControllerEpochs"
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 720e22677b94e3607f002577f091c23714bcd515 Author: Jun Rao <jun...@gmail.com> AuthorDate: Thu Mar 1 18:02:07 2018 -0800 Revert "removed method updatedLeaderIsrAndControllerEpochs" This reverts commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e. --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index e44c2c9..9590ecd 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -700,6 +700,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] = leaderIsrAndControllerEpochs(0, 0) + private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] = +leaderIsrAndControllerEpochs(state, state - 1) val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr) private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] = @@ -841,18 +843,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)), - zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map { + zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map { eraseMetadataAndStat}.toList) val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11) assertEquals(2, getResponses.size) -topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)} +topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)} // Other ZK client can also write the state of a partition assertEquals( expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)), - otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map { + otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map { eraseMetadataAndStat}.toList) } -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 03/09: Revert "Minor fixes requested by reviewer"
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit ae52e1028e0ec4a13e10fba71c756e5956a43e69 Author: Jun Rao <jun...@gmail.com> AuthorDate: Thu Mar 1 18:02:07 2018 -0800 Revert "Minor fixes requested by reviewer" This reverts commit 1ae4c1d6095a86266ba6d008711570df0eaf5682. --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d6826a6..5cbb76e 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.deleteDelegationToken(tokenId)) assertEquals(None, zkClient.getDelegationTokenInfo(tokenId)) } -} +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 02/09: Revert "Changes requeted by reviewer"
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit aa3453c157b43429a7e01c209aefc4e7979d1f2e Author: Jun Rao <jun...@gmail.com> AuthorDate: Thu Mar 1 18:02:07 2018 -0800 Revert "Changes requeted by reviewer" This reverts commit cacd377933ae0e7da0ed08dd33ab69fabde073c5. --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 103 ++--- 2 files changed, 52 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index d61b281..851c686 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -89,7 +89,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) val response = retryRequestUntilConnected(setDataRequest) -response.maybeThrow() + if (response.resultCode != Code.OK) + throw KeeperException.create(response.resultCode) info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 9590ecd..d6826a6 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { var otherZkClient: KafkaZkClient = _ @Before - override def setUp(): Unit = { + override def setUp() { super.setUp() otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) } @After - override def tearDown(): Unit = { + override def tearDown() { if (otherZkClient != null) otherZkClient.close() super.tearDown() @@ -131,8 +131,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty) assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty) +val topicPartition = new TopicPartition(topic1, 0) val assignment = Map( - new TopicPartition(topic1, 0) -> Seq(0, 1), + topicPartition -> Seq(0, 1), new TopicPartition(topic1, 1) -> Seq(0, 1), new TopicPartition(topic1, 2) -> Seq(1, 2, 3) ) @@ -310,10 +311,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testLogDirGetters(): Unit = { -assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node", - Seq.empty, zkClient.getAllLogDirEventNotifications) -assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node", - Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("00"))) +assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications) +assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("00"))) zkClient.createRecursive("/log_dir_event_notification") @@ -496,7 +495,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test - def testDeletePath(): Unit = { + def testDeletePath() { val path = "/a/b/c" zkClient.createRecursive(path) zkClient.deletePath(path) @@ -504,7 +503,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { } @Test - def testDeleteTopicZNode(): Unit = { + def testDeleteTopicZNode(): Unit ={ zkClient.deleteTopicZNode(topic1) zkClient.createRecursive(TopicZNode.path(topic1)) zkClient.deleteTopicZNode(topic1) @@ -526,20 +525,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertTrue(zkClient.getTopicDeletions.isEmpty) } - private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = { + private def assertPathExistenceAndData(expectedPath: String, data: String){ assertTrue(zkClient.pathExists(expectedPath)) assertEquals(Some(data), dataAsString(expectedPath)) } @Test - def testCreateTokenChangeNotification(): Unit = { + def testCreateTokenChangeNotification() { intercept[NoNodeException] { zkClient.createTokenChangeNotification("delegationToken") } zkClient.createDelegationTokenPaths() zkClient.createTokenChangeNotification("delegationToken") - as
[kafka] 08/09: Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit f6b39067cff96a046bedee17e49d0c476138dd87 Author: Jun Rao <jun...@gmail.com> AuthorDate: Thu Mar 1 18:02:07 2018 -0800 Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children" This reverts commit af598af20828149bdd45336377e70d26360452fe. --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 851c686..145e294 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) + deleteLogDirEventNotifications(getChildrenResponse.children) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] 04/09: Revert "Fixes requested by reviewer"
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git commit 4110ad5208621a4bc0d184e95c909662b2819204 Author: Jun Rao <jun...@gmail.com> AuthorDate: Thu Mar 1 18:02:07 2018 -0800 Revert "Fixes requested by reviewer" This reverts commit ae51a15026e0dc10ef2df4bcafc85807dc8b4eb6. --- core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 5cbb76e..28dbb73 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -19,6 +19,7 @@ package kafka.zk import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.{CountDownLatch, TimeUnit} +import javax.security.auth.login.Configuration import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} @@ -56,7 +57,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { val topicPartition20 = new TopicPartition(topic2, 0) val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) - var otherZkClient: KafkaZkClient = _ + var otherZkClient: KafkaZkClient = null @Before override def setUp() { -- To stop receiving notification emails like this one, please contact jun...@apache.org.
[kafka] branch trunk updated: KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aaf8e02 KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) aaf8e02 is described below commit aaf8e0240355d7eb3828583f61af14dc3c3b Author: Jun Rao AuthorDate: Fri Oct 12 10:11:54 2018 -0700 KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) Reviewers: Dong Lin --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala| 6 +++--- .../scala/unit/kafka/controller/PartitionStateMachineTest.scala | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 663ee8d..e4f0532 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig, Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = { leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) + val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true)) val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveReplicas.toSet, shuttingDownBrokers) + val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers) val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica)) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)) - (partition, newLeaderAndIsrOpt, liveReplicas) + (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas) } } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index b89632e..3370b54 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite { val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) - EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), + +// The leaderAndIsr request should be sent to both brokers, including the shutting down one + EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId, otherBrokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) @@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite { topicDeletionManager.enqueueTopicsForDeletion(Set(topic)) assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount) } - }
[kafka] branch 2.1 updated: KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new c7131fc KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) c7131fc is described below commit c7131fc7a3efcc338321f72de1f159f1f06cfcfb Author: Jun Rao AuthorDate: Fri Oct 12 10:11:54 2018 -0700 KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) Reviewers: Dong Lin --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala| 6 +++--- .../scala/unit/kafka/controller/PartitionStateMachineTest.scala | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 663ee8d..e4f0532 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig, Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = { leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) + val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true)) val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveReplicas.toSet, shuttingDownBrokers) + val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers) val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica)) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)) - (partition, newLeaderAndIsrOpt, liveReplicas) + (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas) } } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index b89632e..3370b54 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite { val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) - EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), + +// The leaderAndIsr request should be sent to both brokers, including the shutting down one + EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId, otherBrokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) @@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite { topicDeletionManager.enqueueTopicsForDeletion(Set(topic)) assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount) } - }
[kafka] branch 2.0 updated: KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new a724c45 KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515) a724c45 is described below commit a724c454b85ca17b3affd23e2e82cd2912bc7513 Author: huxi AuthorDate: Fri Aug 17 05:54:58 2018 +0800 KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515) Reviewers: Jun Rao --- .../main/scala/kafka/controller/KafkaController.scala | 17 +++-- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 11d22fd..f3192a3 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -990,16 +990,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { -topicsNotInPreferredReplica.keys.foreach { topicPartition => - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) && -controllerContext.partitionsBeingReassigned.isEmpty && - !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) && -controllerContext.allTopics.contains(topicPartition.topic)) { -onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true) - } -} +// do this check only if the broker is live and there are no partitions being reassigned currently +// and preferred replica election is not in progress +val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && + controllerContext.partitionsBeingReassigned.isEmpty && + !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && + controllerContext.allTopics.contains(tp.topic)) +onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true) } } }
[kafka] branch 1.1 updated: KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 2e97856 KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515) 2e97856 is described below commit 2e978568e0c6ad87e13d2fa3fe983a2dcddeb38a Author: huxi AuthorDate: Fri Aug 17 05:54:58 2018 +0800 KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515) Reviewers: Jun Rao --- .../main/scala/kafka/controller/KafkaController.scala | 17 +++-- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 97b6406..610152f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -993,16 +993,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { -topicsNotInPreferredReplica.keys.foreach { topicPartition => - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) && -controllerContext.partitionsBeingReassigned.isEmpty && - !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) && -controllerContext.allTopics.contains(topicPartition.topic)) { -onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true) - } -} +// do this check only if the broker is live and there are no partitions being reassigned currently +// and preferred replica election is not in progress +val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && + controllerContext.partitionsBeingReassigned.isEmpty && + !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && + controllerContext.allTopics.contains(tp.topic)) +onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true) } } }
[kafka-site] branch asf-site updated: added verification file for google search tools (#167)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new f0a9f0f added verification file for google search tools (#167) f0a9f0f is described below commit f0a9f0fa8e225ece6074deb07ba2f1c631f1181d Author: Derrick Or AuthorDate: Fri Nov 9 13:33:20 2018 -0800 added verification file for google search tools (#167) Reviewers: Jun Rao --- googlebc99ea69c42cb214.html | 1 + 1 file changed, 1 insertion(+) diff --git a/googlebc99ea69c42cb214.html b/googlebc99ea69c42cb214.html new file mode 100644 index 000..79763ff --- /dev/null +++ b/googlebc99ea69c42cb214.html @@ -0,0 +1 @@ +google-site-verification: googlebc99ea69c42cb214.html \ No newline at end of file
[kafka] branch trunk updated: KAFKA-7412: clarify the doc for producer callback (#5798)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 895c83f KAFKA-7412: clarify the doc for producer callback (#5798) 895c83f is described below commit 895c83f88d351ac1f86e5df13af23423e162b774 Author: huxi AuthorDate: Fri Nov 9 08:58:14 2018 +0800 KAFKA-7412: clarify the doc for producer callback (#5798) The metadata in the callback is not null with non-null exception. Reviewers: Jun Rao --- .../main/java/org/apache/kafka/clients/producer/Callback.java | 11 ++- examples/src/main/java/kafka/examples/Producer.java | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java index a70e4e9..f7d4bcd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java @@ -24,10 +24,11 @@ public interface Callback { /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will - * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be - * non-null. - * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error - *occurred. + * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, + * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata + * with -1 value for all fields except for topicPartition will be returned if an error occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. * Possible thrown exceptions include: * @@ -49,5 +50,5 @@ public interface Callback { * TimeoutException * UnknownTopicOrPartitionException */ -public void onCompletion(RecordMetadata metadata, Exception exception); +void onCompletion(RecordMetadata metadata, Exception exception); } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 8721280..b6998c5 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -81,8 +81,8 @@ class DemoCallBack implements Callback { /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will - * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be - * non-null. + * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, + * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred.
[kafka] branch trunk updated: KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7b5ffa0 KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869) 7b5ffa0 is described below commit 7b5ffa0a070065e5e8320f481bbd8a3a26378f91 Author: Zhanxiang (Patrick) Huang AuthorDate: Tue Nov 6 15:28:53 2018 -0800 KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint (#5869) This PR avoids sending out full UpdateMetadataReuqest in the following scenarios: 1. On broker startup, send out full UpdateMetadataRequest to newly added brokers and only send out UpdateMetadataReuqest with empty partition states to existing brokers. 2. On broker failure, if it doesn't require leader election, only include the states of partitions that are hosted by the dead broker(s) in the UpdateMetadataReuqest instead of including all partition states. This PR also introduces a minor optimization in the MetadataCache update to avoid copying the previous partition states upon receiving UpdateMetadataRequest with no partition states. Reviewers: Jun Rao --- .../controller/ControllerChannelManager.scala | 7 +-- .../scala/kafka/controller/KafkaController.scala | 21 + .../main/scala/kafka/server/MetadataCache.scala| 44 ++ .../controller/ControllerIntegrationTest.scala | 53 ++ 4 files changed, 90 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 85da8b8..a11f553 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge } } -val givenPartitions = if (partitions.isEmpty) - controllerContext.partitionLeadershipInfo.keySet -else - partitions - updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0) -givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, +partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic))) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 740ab7f..a52f3f0 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). info("Sending update metadata request") - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) replicaStateMachine.startup() partitionStateMachine.startup() @@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet -// send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new -// broker via this update. +val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers +// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers +// via this update. No need to include any partition states in the request since there are no partition state changes. +sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty) +// Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the -// common controlled shutdown case, the metadata will reach the new brokers faster - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) +// common controlled shutdown case, the metadata will reach the new brokers faster. +
[kafka] branch trunk updated: KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new eb3335e KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575) eb3335e is described below commit eb3335ef592a3dd22895a5bb499a2d0b232227a7 Author: Jonathan Santilli AuthorDate: Thu Nov 8 21:28:37 2018 + KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper (#5575) * Add logic to retry the BrokerInfo registration into ZooKeeper In case the ZooKeeper session has been regenerated and the broker tries to register the BrokerInfo into Zookeeper, this code deletes the current BrokerInfo from Zookeeper and creates it again, just if the znode ephemeral owner belongs to the Broker which tries to register himself again into ZooKeeper * Add test to validate the BrokerInfo re-registration into ZooKeeper --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 88 -- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 60 +++ 2 files changed, 141 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index a12abb4..4ad40ef 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -53,7 +53,7 @@ import scala.collection.JavaConverters._ * easier to quickly migrate away from `ZkUtils`. We should revisit this once the migration is completed and tests are * in place. We should also consider whether a monolithic [[kafka.zk.ZkData]] is the way to go. */ -class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with +class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time) extends AutoCloseable with Logging with KafkaMetricsGroup { override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { @@ -67,6 +67,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean // Only for testing private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper + // This variable holds the Zookeeper session id at the moment a Broker gets registered in Zookeeper and the subsequent + // updates of the session id. It is possible that the session id changes over the time for 'Session expired'. + // This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must + // be deleted. + private var currentZooKeeperSessionId: Long = -1 + /** * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect * and a monotonically increasing number will be appended to its name. @@ -1585,7 +1591,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path) - private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { + private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { retryRequestsUntilConnected(Seq(request)).head } @@ -1631,26 +1637,94 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean throw KeeperException.create(code) } + private def isZKSessionIdDiffFromCurrentZKSessionId(): Boolean = { +zooKeeperClient.sessionId != currentZooKeeperSessionId + } + + private def isZKSessionTheEphemeralOwner(ephemeralOwnerId: Long): Boolean = { +ephemeralOwnerId == currentZooKeeperSessionId + } + + private[zk] def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = { +isZKSessionTheEphemeralOwner(ephemeralOwnerId) && isZKSessionIdDiffFromCurrentZKSessionId() + } + + private def updateCurrentZKSessionId(newSessionId: Long): Unit = { +currentZooKeeperSessionId = newSessionId + } + private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging { def create(): Code = { val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL) val createResponse = retryRequestUntilConnected(createRequest) - createResponse.resultCode match { -case code@ Code.OK => code -case Code.NODEEXISTS => getAfterNodeExists() + val createResultCode = createResponse.resultCode match { +case code@ Code.OK => + code +case Code.NODEEXISTS => + getAfterNodeExists() case code => error(s"Error while creating ephemeral at $path with return code: $code") code } + + if (createResultCode == Code.OK) { +
[kafka] branch trunk updated: KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3eaf44b KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848) 3eaf44b is described below commit 3eaf44ba8ea26a7a820894390e8877d404ddd5a2 Author: huxi AuthorDate: Tue Nov 13 01:02:44 2018 +0800 KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848) Instead of calling deleteSnapshotsAfterRecoveryPointCheckpoint for allLogs, invoking it only for the logs being truncated. Reviewers: Ismael Juma , Jun Rao --- core/src/main/scala/kafka/log/LogManager.scala | 60 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 29 +++ 2 files changed, 66 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 26bfbe9..508dcd0 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File], CoreUtils.swallow(cleaner.shutdown(), this) } +val localLogsByDir = logsByDir + // close logs in each dir for (dir <- liveLogDirs) { debug(s"Flushing and closing logs at $dir") @@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File], val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) - val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values + val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values val jobsForDir = logsInDir map { log => CoreUtils.runnable { @@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File], // update the last flush point debug(s"Updating recovery points at $dir") -checkpointLogRecoveryOffsetsInDir(dir) +checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq) debug(s"Updating log start offsets at $dir") checkpointLogStartOffsetsInDir(dir) @@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File], * @param isFuture True iff the truncation should be performed on the future log of the specified partitions */ def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) { -var truncated = false +val affectedLogs = ArrayBuffer.empty[Log] for ((topicPartition, truncateOffset) <- partitionOffsets) { val log = { if (isFuture) @@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File], cleaner.abortAndPauseCleaning(topicPartition) try { if (log.truncateTo(truncateOffset)) -truncated = true +affectedLogs += log if (needToStopCleaner && !isFuture) cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) } finally { @@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File], } } -if (truncated) - checkpointLogRecoveryOffsets() +for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) { + checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs) +} } /** @@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File], info(s"Compaction for partition $topicPartition is resumed") } } - checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile) + checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log)) } } @@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File], * to avoid recovering the whole log on startup. */ def checkpointLogRecoveryOffsets() { -liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir) +logsByDir.foreach { case (dir, partitionToLogMap) => + liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f => +checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq) + } +} } /** @@ -578,21 +585,29 @@ class LogManager(logDirs: Seq[File], } /** - * Make a checkpoint for all logs in provided directory. - */ +* Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs. +* +* @param dir the directory in which logs are checkpointed +* @param logsToCleanSnapshot logs whose snapshots need to be cleaned +*/ + // Only for testing + private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = { +try { + checkpointLogRecoveryOffsetsInDir(dir) + logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) +} catch { + case e: IOException => +logDirFailureChannel.m
[kafka] branch trunk updated: KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0848b78 KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728) 0848b78 is described below commit 0848b78881afce5899cea1f10c323249f9f0b8cc Author: Manikumar Reddy O AuthorDate: Tue Oct 9 23:07:54 2018 +0530 KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728) Reviewers: Ismael Juma and Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 22 - core/src/main/scala/kafka/log/LogSegment.scala | 14 +-- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 28 ++ core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +- core/src/test/scala/unit/kafka/log/LogUtils.scala | 3 +-- .../kafka/server/DynamicConfigChangeTest.scala | 27 + 6 files changed, 68 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 094473a..bc328d7 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -146,6 +146,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i } /** + * A class used to hold params required to decide to rotate a log segment or not. + */ +case class RollParams(maxSegmentMs: Long, + maxSegmentBytes: Int, + maxTimestampInMessages: Long, + maxOffsetInMessages: Long, + messagesSize: Int, + now: Long) + +object RollParams { + def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { + new RollParams(config.segmentMs, + config.segmentSize, + appendInfo.maxTimestamp, + appendInfo.lastOffset, + messagesSize, now) + } +} + +/** * An append-only log for storing messages. * * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment. @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File, val maxTimestampInMessages = appendInfo.maxTimestamp val maxOffsetInMessages = appendInfo.lastOffset -if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) { +if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 80763a8..d910a29 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -45,8 +45,10 @@ import scala.math._ * @param log The file records containing log entries * @param offsetIndex The offset index * @param timeIndex The timestamp index + * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index + * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */ @nonthreadsafe @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val maxSegmentMs: Long, - val maxSegmentBytes: Int, val time: Time) extends Logging { - def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = { -val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs -size > maxSegmentBytes - messagesSize || + def shouldRoll(rollParams: RollParams): Boolean = { +val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs +size > rollParams.maxSegmentBytes - rollParams.messagesSize || (size > 0 && reachedRollMs) || - offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages) + offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) } def resizeIndexes(size: Int): Unit = { @@ -
[kafka] branch 2.1 updated: KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new acec50e KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728) acec50e is described below commit acec50e19fb202401bca9d18aac60ba19e647ec4 Author: Manikumar Reddy O AuthorDate: Tue Oct 9 23:07:54 2018 +0530 KAFKA-7366: Make topic configs segment.bytes and segment.ms to take effect immediately (#5728) Reviewers: Ismael Juma and Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 22 - core/src/main/scala/kafka/log/LogSegment.scala | 14 +-- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 28 ++ core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +- core/src/test/scala/unit/kafka/log/LogUtils.scala | 3 +-- .../kafka/server/DynamicConfigChangeTest.scala | 27 + 6 files changed, 68 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 094473a..bc328d7 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -146,6 +146,26 @@ case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, i } /** + * A class used to hold params required to decide to rotate a log segment or not. + */ +case class RollParams(maxSegmentMs: Long, + maxSegmentBytes: Int, + maxTimestampInMessages: Long, + maxOffsetInMessages: Long, + messagesSize: Int, + now: Long) + +object RollParams { + def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { + new RollParams(config.segmentMs, + config.segmentSize, + appendInfo.maxTimestamp, + appendInfo.lastOffset, + messagesSize, now) + } +} + +/** * An append-only log for storing messages. * * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment. @@ -1493,7 +1513,7 @@ class Log(@volatile var dir: File, val maxTimestampInMessages = appendInfo.maxTimestamp val maxOffsetInMessages = appendInfo.lastOffset -if (segment.shouldRoll(messagesSize, maxTimestampInMessages, maxOffsetInMessages, now)) { +if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 80763a8..d910a29 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -45,8 +45,10 @@ import scala.math._ * @param log The file records containing log entries * @param offsetIndex The offset index * @param timeIndex The timestamp index + * @param txnIndex The transaction index * @param baseOffset A lower bound on the offsets in this segment * @param indexIntervalBytes The approximate number of bytes between entries in the index + * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance */ @nonthreadsafe @@ -57,15 +59,13 @@ class LogSegment private[log] (val log: FileRecords, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val maxSegmentMs: Long, - val maxSegmentBytes: Int, val time: Time) extends Logging { - def shouldRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long, now: Long): Boolean = { -val reachedRollMs = timeWaitedForRoll(now, maxTimestampInMessages) > maxSegmentMs - rollJitterMs -size > maxSegmentBytes - messagesSize || + def shouldRoll(rollParams: RollParams): Boolean = { +val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs +size > rollParams.maxSegmentBytes - rollParams.messagesSize || (size > 0 && reachedRollMs) || - offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(maxOffsetInMessages) + offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages) } def resizeIndexes(size: Int): Unit = { @@ -
[kafka] branch 2.1 updated: KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 4fe48f3 KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734) 4fe48f3 is described below commit 4fe48f36a276f8875241feab61ce618d5cdced80 Author: Manikumar Reddy O AuthorDate: Wed Oct 10 00:22:57 2018 +0530 KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734) Reviewers: Jun Rao --- docs/security.html | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/security.html b/docs/security.html index 72ba239..b018334 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1018,7 +1018,7 @@ authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want [...] allow.everyone.if.no.acl.found=true -One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). +One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive. super.users=User:Bob;User:Alice By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in server.properties like the following. principal.builder.class=CustomizedPrincipalBuilderClass @@ -1119,19 +1119,19 @@ --allow-principal -Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple --allow-principal in a single command. +Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --allow-principal in a single command. Principal --deny-principal -Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple --deny-principal in a single command. +Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --deny-principal in a single command. Principal --principal -Principal is in PrincipalType:name format that will be used along with --list option. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. +Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. Principal
[kafka] branch trunk updated: KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 15dbab0 KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734) 15dbab0 is described below commit 15dbab0e35063714b7bbea58176d1627878c15c3 Author: Manikumar Reddy O AuthorDate: Wed Oct 10 00:22:57 2018 +0530 KAFKA-3097: Update docs to mention PrincipalType "User" is case sensitive (#5734) Reviewers: Jun Rao --- docs/security.html | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/security.html b/docs/security.html index 72ba239..b018334 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1018,7 +1018,7 @@ authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer Kafka acls are defined in the general format of "Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP". You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want [...] allow.everyone.if.no.acl.found=true -One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). +One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string "User" is case sensitive. super.users=User:Bob;User:Alice By default, the SSL user name will be of the form "CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can change that by setting a customized PrincipalBuilder in server.properties like the following. principal.builder.class=CustomizedPrincipalBuilderClass @@ -1119,19 +1119,19 @@ --allow-principal -Principal is in PrincipalType:name format that will be added to ACL with Allow permission. You can specify multiple --allow-principal in a single command. +Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --allow-principal in a single command. Principal --deny-principal -Principal is in PrincipalType:name format that will be added to ACL with Deny permission. You can specify multiple --deny-principal in a single command. +Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string "User" is case sensitive. You can specify multiple --deny-principal in a single command. Principal --principal -Principal is in PrincipalType:name format that will be used along with --list option. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. +Principal is in PrincipalType:name format that will be used along with --list option. Default PrincipalType string "User" is case sensitive. This will list the ACLs for the specified principal. You can specify multiple --principal in a single command. Principal
[kafka] branch trunk updated: KAFKA-7215: Improve LogCleaner Error Handling (#5439)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 13379af KAFKA-7215: Improve LogCleaner Error Handling (#5439) 13379af is described below commit 13379af17d95788568038c0c6daeb983819669e3 Author: Stanislav Kozlovski AuthorDate: Mon Oct 8 20:54:37 2018 +0100 KAFKA-7215: Improve LogCleaner Error Handling (#5439) The thread no longer dies. When encountering an unexpected error, it marks the partition as "uncleanable" which means it will not try to clean its logs in subsequent runs. Reviewers: Dhruvil Shah , Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 107 -- .../main/scala/kafka/log/LogCleanerManager.scala | 86 - .../scala/kafka/server/LogDirFailureChannel.scala | 3 +- .../log/AbstractLogCleanerIntegrationTest.scala| 30 ++ .../unit/kafka/log/LogCleanerIntegrationTest.scala | 389 - .../kafka/log/LogCleanerLagIntegrationTest.scala | 12 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 193 -- ...> LogCleanerParameterizedIntegrationTest.scala} | 35 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 7 +- core/src/test/scala/unit/kafka/log/LogUtils.scala | 41 +++ 11 files changed, 479 insertions(+), 426 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8915c14..094473a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -193,7 +193,7 @@ class Log(@volatile var dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object - // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers() + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index bf4f7e1..0416325 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.collection.{Iterable, Set, mutable} +import scala.util.control.ControlThrowable /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. @@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig, /** * The main loop for the cleaner thread + * Clean a log if there is a dirty log available, otherwise sleep for a bit */ override def doWork() { - cleanOrSleep() + val cleaned = cleanFilthiestLog() + if (!cleaned) +pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** - * Clean a log if there is a dirty log available, otherwise sleep for a bit - */ -private def cleanOrSleep() { - val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { -case None => - false -case Some(cleanable) => - // there's a log, clean it - var endOffset = cleanable.firstDirtyOffset - try { -val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) -recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) -endOffset = nextDirtyOffset - } catch { -case _: LogCleaningAbortedException => // task can be aborted, let it go. -case _: KafkaStorageException => // partition is already offline. let it go. -case e: IOException => - val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException" - logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e) - } finally { -cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) + * Cleans a log if there is a dirty log available + * @return whether a log was cleaned + */ +private def cleanFilthiestLog(): Boolean = { + var currentLog: Option[Log] = None + + try { +val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { + case None => +false + case Some(cleanable) => +// there's a log, clean it +currentLog
[kafka] branch 2.1 updated: KAFKA-7215: Improve LogCleaner Error Handling (#5439)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 07c5282 KAFKA-7215: Improve LogCleaner Error Handling (#5439) 07c5282 is described below commit 07c5282d25cb04cf1212c3daec5c7d8798f9efa1 Author: Stanislav Kozlovski AuthorDate: Mon Oct 8 20:54:37 2018 +0100 KAFKA-7215: Improve LogCleaner Error Handling (#5439) The thread no longer dies. When encountering an unexpected error, it marks the partition as "uncleanable" which means it will not try to clean its logs in subsequent runs. Reviewers: Dhruvil Shah , Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 2 +- core/src/main/scala/kafka/log/LogCleaner.scala | 107 -- .../main/scala/kafka/log/LogCleanerManager.scala | 86 - .../scala/kafka/server/LogDirFailureChannel.scala | 3 +- .../log/AbstractLogCleanerIntegrationTest.scala| 30 ++ .../unit/kafka/log/LogCleanerIntegrationTest.scala | 389 - .../kafka/log/LogCleanerLagIntegrationTest.scala | 12 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 193 -- ...> LogCleanerParameterizedIntegrationTest.scala} | 35 +- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 7 +- core/src/test/scala/unit/kafka/log/LogUtils.scala | 41 +++ 11 files changed, 479 insertions(+), 426 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8915c14..094473a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -193,7 +193,7 @@ class Log(@volatile var dir: File, /* A lock that guards all modifications to the log */ private val lock = new Object - // The memory mapped buffer for index files of this log will be closed for index files of this log will be closed with either delete() or closeHandlers() + // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // After memory mapped buffer is closed, no disk IO operation should be performed for this log @volatile private var isMemoryMappedBufferClosed = false diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index bf4f7e1..0416325 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ import scala.collection.{Iterable, Set, mutable} +import scala.util.control.ControlThrowable /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. @@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig, /** * The main loop for the cleaner thread + * Clean a log if there is a dirty log available, otherwise sleep for a bit */ override def doWork() { - cleanOrSleep() + val cleaned = cleanFilthiestLog() + if (!cleaned) +pause(config.backOffMs, TimeUnit.MILLISECONDS) } /** - * Clean a log if there is a dirty log available, otherwise sleep for a bit - */ -private def cleanOrSleep() { - val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { -case None => - false -case Some(cleanable) => - // there's a log, clean it - var endOffset = cleanable.firstDirtyOffset - try { -val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable) -recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats) -endOffset = nextDirtyOffset - } catch { -case _: LogCleaningAbortedException => // task can be aborted, let it go. -case _: KafkaStorageException => // partition is already offline. let it go. -case e: IOException => - val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException" - logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e) - } finally { -cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) + * Cleans a log if there is a dirty log available + * @return whether a log was cleaned + */ +private def cleanFilthiestLog(): Boolean = { + var currentLog: Option[Log] = None + + try { +val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match { + case None => +false + case Some(cleanable) => +// there's a log, clean it +currentLog
[kafka] branch trunk updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6ced855 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673) 6ced855 is described below commit 6ced8550b3183a71500a66728e9957c19456d4f9 Author: Manikumar Reddy O AuthorDate: Sat Sep 22 02:17:26 2018 +0530 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673) Reviewers: Jun Rao --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 21 + 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 7ec572c..892377c 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 import kafka.network.RequestChannel.Session -import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} +import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} import scala.collection.JavaConverters._ -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -267,12 +267,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { ZkAclStore.stores.foreach(store => { val resourceTypes = zkClient.getResourceTypes(store.patternType) for (rType <- resourceTypes) { - val resourceType = ResourceType.fromString(rType) - val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) - for (resourceName <- resourceNames) { -val resource = new Resource(resourceType, resourceName, store.patternType) -val versionedAcls = getAclsFromZk(resource) -updateCache(resource, versionedAcls) + val resourceType = Try(ResourceType.fromString(rType)) + resourceType match { +case Success(resourceTypeObj) => { + val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) + for (resourceName <- resourceNames) { +val resource = new Resource(resourceTypeObj, resourceName, store.patternType) +val versionedAcls = getAclsFromZk(resource) +updateCache(resource, versionedAcls) + } +} +case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } })
[kafka] branch 2.0 updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.0 by this push: new cfd33b3 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673) cfd33b3 is described below commit cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b Author: Manikumar Reddy O AuthorDate: Sat Sep 22 02:17:26 2018 +0530 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5673) Reviewers: Jun Rao --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 21 + 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 9472411..6de81d2 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 import kafka.network.RequestChannel.Session -import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} +import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -33,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} import scala.collection.JavaConverters._ -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -259,12 +259,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { ZkAclStore.stores.foreach(store => { val resourceTypes = zkClient.getResourceTypes(store.patternType) for (rType <- resourceTypes) { - val resourceType = ResourceType.fromString(rType) - val resourceNames = zkClient.getResourceNames(store.patternType, resourceType) - for (resourceName <- resourceNames) { -val resource = new Resource(resourceType, resourceName, store.patternType) -val versionedAcls = getAclsFromZk(resource) -updateCache(resource, versionedAcls) + val resourceType = Try(ResourceType.fromString(rType)) + resourceType match { +case Success(resourceTypeObj) => { + val resourceNames = zkClient.getResourceNames(store.patternType, resourceTypeObj) + for (resourceName <- resourceNames) { +val resource = new Resource(resourceTypeObj, resourceName, store.patternType) +val versionedAcls = getAclsFromZk(resource) +updateCache(resource, versionedAcls) + } +} +case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } })
[kafka] branch trunk updated: KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0bc7008 KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646) 0bc7008 is described below commit 0bc7008e7559337b588c60366d1b935eace59ffc Author: Bob Barrett AuthorDate: Fri Sep 21 13:31:45 2018 -0700 KAFKA-7400: Compacted topic segments that precede the log start offse… (#5646) * KAFKA-7400: Compacted topic segments that precede the log start offset are not cleaned up Currently we don't delete any log segments if the cleanup policy doesn't include delete. This patch changes the behavior to delete log segments that fully precede the log start offset even when deletion is not enabled. Tested with unit tests to verify that LogManager.cleanupLogs now cleans logs with cleanup.policy=compact and that Log.deleteOldSegments deletes segments that preced the start offset regardless of the cleanup policy. Reviewers: Dhruvil Shah , Jason Gustafson , Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 13 +++ .../main/scala/kafka/log/LogCleanerManager.scala | 5 +++-- .../unit/kafka/log/LogCleanerManagerTest.scala | 8 +++ .../test/scala/unit/kafka/log/LogManagerTest.scala | 17 -- core/src/test/scala/unit/kafka/log/LogTest.scala | 26 ++ 5 files changed, 57 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index afe151d..c9b877b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1353,12 +1353,17 @@ class Log(@volatile var dir: File, } /** - * Delete any log segments that have either expired due to time based retention - * or because the log size is > retentionSize + * If topic deletion is enabled, delete any log segments that have either expired due to time based retention + * or because the log size is > retentionSize. + * + * Whether or not deletion is enabled, delete any log segments that are before the log start offset */ def deleteOldSegments(): Int = { -if (!config.delete) return 0 -deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() +if (config.delete) { + deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() +} else { + deleteLogStartOffsetBreachedSegments() +} } private def deleteRetentionMsBreachedSegments(): Int = { diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 83d902f..680fa94 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -171,12 +171,13 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /** -* Find any logs that have compact and delete enabled +* Find any logs that have compaction enabled. Include logs without delete enabled, as they may have segments +* that precede the start offset. */ def deletableLogs(): Iterable[(TopicPartition, Log)] = { inLock(lock) { val toClean = logs.filter { case (topicPartition, log) => -!inProgress.contains(topicPartition) && isCompactAndDelete(log) +!inProgress.contains(topicPartition) && log.config.compact } toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) } toClean diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala index 8cb2f9e..3653e28 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala @@ -77,17 +77,17 @@ class LogCleanerManagerTest extends JUnitSuite with Logging { } /** -* When looking for logs with segments ready to be deleted we shouldn't consider -* logs with cleanup.policy=compact as they shouldn't have segments truncated. +* When looking for logs with segments ready to be deleted we should consider +* logs with cleanup.policy=compact because they may have segments from before the log start offset */ @Test - def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyCompactLogs(): Unit = { + def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs(): Unit = { val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) val cleanerManager: LogCleanerManager = createCleanerManager(log)
[kafka] branch 1.0 updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new 8e6ffd2 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679) 8e6ffd2 is described below commit 8e6ffd2bb8f6a54b4d2298b6faf66a1035875bef Author: Manikumar Reddy O AuthorDate: Wed Sep 26 06:51:14 2018 +0530 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679) Reviewers: Jun Rao --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 3a00226..d4ec4f2 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.collection.JavaConverters._ import org.apache.log4j.Logger -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -235,12 +235,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { inWriteLock(lock) { val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) for (rType <- resourceTypes) { -val resourceType = ResourceType.fromString(rType) -val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name -val resourceNames = zkUtils.getChildren(resourceTypePath) -for (resourceName <- resourceNames) { - val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString)) - updateCache(new Resource(resourceType, resourceName), versionedAcls) +val resourceType = Try(ResourceType.fromString(rType)) +resourceType match { + case Success(resourceTypeObj) => { +val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceTypeObj.name +val resourceNames = zkUtils.getChildren(resourceTypePath) +for (resourceName <- resourceNames) { + val versionedAcls = getAclsFromZk(Resource(resourceTypeObj, resourceName.toString)) + updateCache(new Resource(resourceTypeObj, resourceName), versionedAcls) +} + } + case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } }
[kafka] branch 1.1 updated: KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 8b1d283 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680) 8b1d283 is described below commit 8b1d283d8945788341890af759d6ed2be27ced7b Author: Manikumar Reddy O AuthorDate: Wed Sep 26 06:49:51 2018 +0530 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5680) Reviewers: Jun Rao --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index da85b00..979f7f6 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.typesafe.scalalogging.Logger import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.network.RequestChannel.Session -import kafka.security.auth.SimpleAclAuthorizer.{VersionedAcls, NoAcls} +import kafka.security.auth.SimpleAclAuthorizer.{NoAcls, VersionedAcls} import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ @@ -32,7 +32,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{SecurityUtils, Time} import scala.collection.JavaConverters._ -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -216,11 +216,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging { inWriteLock(lock) { val resourceTypes = zkClient.getResourceTypes() for (rType <- resourceTypes) { -val resourceType = ResourceType.fromString(rType) -val resourceNames = zkClient.getResourceNames(resourceType.name) -for (resourceName <- resourceNames) { - val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName)) - updateCache(new Resource(resourceType, resourceName), versionedAcls) +val resourceType = Try(ResourceType.fromString(rType)) +resourceType match { + case Success(resourceTypeObj) => { +val resourceNames = zkClient.getResourceNames(resourceTypeObj.name) +for (resourceName <- resourceNames) { + val versionedAcls = getAclsFromZk(Resource(resourceTypeObj, resourceName)) + updateCache(new Resource(resourceTypeObj, resourceName), versionedAcls) +} + } + case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } }
[kafka] branch trunk updated: KAFKA-7829; Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later (#6157)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6a7eebe KAFKA-7829; Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later (#6157) 6a7eebe is described below commit 6a7eebe891d50ac0bf4d17bb870031718705fa37 Author: Dong Lin AuthorDate: Thu Jan 17 15:30:16 2019 -0800 KAFKA-7829; Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later (#6157) Reviewers: Jun Rao --- .../org/apache/kafka/clients/admin/AdminClient.java | 20 .../kafka/admin/ReassignPartitionsCommand.scala | 3 +-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 1521ee5..bdd7cc3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -389,14 +389,17 @@ public abstract class AdminClient implements AutoCloseable { public abstract AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options); /** - * Change the log directory for the specified replicas. This API is currently only useful if it is used - * before the replica has been created on the broker. It will support moving replicas that have already been created after - * KIP-113 is fully implemented. + * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result + * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the + * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given + * log directory if it is not already there. + * + * This operation is not transactional so it may succeed for some replicas while fail for others. * * This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options. * See the overload for more details. * - * This operation is supported by brokers with version 1.0.0 or higher. + * This operation is supported by brokers with version 1.1.0 or higher. * * @param replicaAssignment The replicas with their log directory absolute path * @return The AlterReplicaLogDirsResult @@ -406,13 +409,14 @@ public abstract class AdminClient implements AutoCloseable { } /** - * Change the log directory for the specified replicas. This API is currently only useful if it is used - * before the replica has been created on the broker. It will support moving replicas that have already been created after - * KIP-113 is fully implemented. + * Change the log directory for the specified replicas. If the replica does not exist on the broker, the result + * shows REPLICA_NOT_AVAILABLE for the given replica and the replica will be created in the given log directory on the + * broker when it is created later. If the replica already exists on the broker, the replica will be moved to the given + * log directory if it is not already there. * * This operation is not transactional so it may succeed for some replicas while fail for others. * - * This operation is supported by brokers with version 1.0.0 or higher. + * This operation is supported by brokers with version 1.1.0 or higher. * * @param replicaAssignment The replicas with their log directory absolute path * @param optionsThe options to use when changing replica dir diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 0f2f937..c108f07 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -466,8 +466,7 @@ object ReassignPartitionsCommand extends Logging { "The format to use is - \n" + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3],\n\t \"log_dirs\": [\"dir1\",\"dir2\",\"dir3\"] }],\n\"version\":1\n}\n" + "Note that \"log_dirs\" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list " + - "can be either \"any\" or the absolution path of the log direct
[kafka] branch trunk updated: Fix KAFKA-7789 by increasing the key size for the RSA keys generated for (#6096)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d8f126d Fix KAFKA-7789 by increasing the key size for the RSA keys generated for (#6096) d8f126d is described below commit d8f126d70a5d67f283cf98dca60b41dbcd8b95ef Author: Tom Bentley AuthorDate: Mon Jan 14 22:00:30 2019 + Fix KAFKA-7789 by increasing the key size for the RSA keys generated for (#6096) Reviewers: Jun Rao --- clients/src/test/java/org/apache/kafka/test/TestSslUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 90b6d8d..b2de0e6 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -89,7 +89,7 @@ public class TestSslUtils { public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException { KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); -keyGen.initialize(1024); +keyGen.initialize(2048); return keyGen.genKeyPair(); }
[kafka] branch trunk updated: MINOR: log when controller begins processing logdir failure event (#6153)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4b54eb4 MINOR: log when controller begins processing logdir failure event (#6153) 4b54eb4 is described below commit 4b54eb46212a5d40d814ab36315f589d69782675 Author: Dhruvil Shah AuthorDate: Thu Jan 17 18:30:54 2019 -0800 MINOR: log when controller begins processing logdir failure event (#6153) Reviewers: Jun Rao --- core/src/main/scala/kafka/controller/KafkaController.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 894f18c..c8cf446 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -336,10 +336,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti /* * This callback is invoked by the controller's LogDirEventNotificationListener with the list of broker ids who * have experienced new log directory failures. In response the controller should send LeaderAndIsrRequest - * to all these brokers to query the state of their replicas + * to all these brokers to query the state of their replicas. Replicas with an offline log directory respond with + * KAFKA_STORAGE_ERROR, which will be handled by the LeaderAndIsrResponseReceived event. */ private def onBrokerLogDirFailure(brokerIds: Seq[Int]) { // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online. +info(s"Handling log directory failure for brokers ${brokerIds.mkString(",")}") val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet) replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica) }
[kafka] branch trunk updated: Forward topic from console consumer to deserializer (#5704)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fb9f2d8 Forward topic from console consumer to deserializer (#5704) fb9f2d8 is described below commit fb9f2d8c9b296c44f42ae3838380895f95171133 Author: Mathieu Chataigner AuthorDate: Thu Nov 29 03:23:30 2018 +0100 Forward topic from console consumer to deserializer (#5704) Some deserializer needs the topic name to be able to correctly deserialize the payload of the message. Console consumer works great with Deserializer however it calls deserializer with topic set as null. This breaks the API and the topic information is available in the ConsumerRecord. Reviewers: Manikumar Reddy , Chia-Ping Tsai , Gardner Vickers , Jun Rao --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 8 ++-- .../scala/kafka/tools/CustomDeserializerTest.scala | 53 ++ 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 42c5c5b..9a8c648 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -509,9 +509,9 @@ class DefaultMessageFormatter extends MessageFormatter { output.write(lineSeparator) } -def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte]) { +def write(deserializer: Option[Deserializer[_]], sourceBytes: Array[Byte], topic: String) { val nonNullBytes = Option(sourceBytes).getOrElse("null".getBytes(StandardCharsets.UTF_8)) - val convertedBytes = deserializer.map(_.deserialize(null, nonNullBytes).toString. + val convertedBytes = deserializer.map(_.deserialize(topic, nonNullBytes).toString. getBytes(StandardCharsets.UTF_8)).getOrElse(nonNullBytes) output.write(convertedBytes) } @@ -527,12 +527,12 @@ class DefaultMessageFormatter extends MessageFormatter { } if (printKey) { - write(keyDeserializer, key) + write(keyDeserializer, key, topic) writeSeparator(printValue) } if (printValue) { - write(valueDeserializer, value) + write(valueDeserializer, value, topic) output.write(lineSeparator) } } diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala new file mode 100644 index 000..37b5b79 --- /dev/null +++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.io.PrintStream + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.Deserializer +import org.hamcrest.CoreMatchers +import org.junit.Test +import org.junit.Assert.assertThat +import org.scalatest.mockito.MockitoSugar + +class CustomDeserializer extends Deserializer[String] { + override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = { + } + + override def deserialize(topic: String, data: Array[Byte]): String = { +assertThat("topic must not be null", topic, CoreMatchers.notNullValue()) +new String(data) + } + + override def close(): Unit = { + } +} + +class CustomDeserializerTest extends MockitoSugar { + + @Test + def checkDeserializerTopicIsNotNull(): Unit = { +val formatter = new DefaultMessageFormatter() +formatter.keyDeserializer = Some(new CustomDeserializer) + +formatter.writeTo(new ConsumerRecord("topic_test", 1, 1l, "key".getBytes, "value".getBytes), mock[PrintStream]) + +formatter.close() + } +}
[kafka] branch trunk updated (e7ce0e7 -> 2155c6d)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from e7ce0e7 KAFKA-4544: Add system tests for delegation token based authentication add 2155c6d KAFKA-7235: Detect outdated control requests and bounced brokers using broker generation (#5821) No new revisions were added by this update. Summary of changes: ...ception.java => StaleBrokerEpochException.java} | 6 +- .../org/apache/kafka/common/protocol/Errors.java | 5 +- .../apache/kafka/common/protocol/types/Struct.java | 12 +- .../common/requests/AbstractControlRequest.java| 80 .../common/requests/ControlledShutdownRequest.java | 27 +- .../requests/ControlledShutdownResponse.java | 3 +- .../kafka/common/requests/LeaderAndIsrRequest.java | 299 -- .../common/requests/LeaderAndIsrResponse.java | 18 +- .../kafka/common/requests/StopReplicaRequest.java | 133 +++--- .../kafka/common/requests/StopReplicaResponse.java | 19 +- .../common/requests/UpdateMetadataRequest.java | 455 +++-- .../common/requests/UpdateMetadataResponse.java| 4 +- .../apache/kafka/common/utils/CollectionUtils.java | 3 +- .../kafka/common/requests/ControlRequestTest.java | 87 .../common/requests/LeaderAndIsrResponseTest.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 35 +- .../common/requests/StopReplicaResponseTest.java | 3 +- core/src/main/scala/kafka/api/ApiVersion.scala | 12 +- .../controller/ControllerChannelManager.scala | 105 ++--- .../scala/kafka/controller/ControllerContext.scala | 33 +- .../scala/kafka/controller/KafkaController.scala | 80 +++- core/src/main/scala/kafka/server/KafkaApis.scala | 57 ++- core/src/main/scala/kafka/server/KafkaServer.scala | 14 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 293 +++-- .../scala/kafka/zookeeper/ZooKeeperClient.scala| 158 --- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +- .../controller/ControllerIntegrationTest.scala | 72 +++- .../controller/PartitionStateMachineTest.scala | 20 +- .../kafka/server/BrokerEpochIntegrationTest.scala | 242 +++ .../scala/unit/kafka/server/KafkaApisTest.scala| 4 +- .../unit/kafka/server/LeaderElectionTest.scala | 11 +- .../unit/kafka/server/MetadataCacheTest.scala | 13 +- .../unit/kafka/server/ReplicaManagerTest.scala | 18 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +- .../unit/kafka/server/ServerShutdownTest.scala | 6 +- .../unit/kafka/server/StopReplicaRequestTest.scala | 5 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 5 + .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 36 +- 39 files changed, 1599 insertions(+), 796 deletions(-) copy clients/src/main/java/org/apache/kafka/common/errors/{InvalidFetchSizeException.java => StaleBrokerEpochException.java} (84%) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java create mode 100644 clients/src/test/java/org/apache/kafka/common/requests/ControlRequestTest.java create mode 100755 core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
[kafka] branch 2.1 updated: KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new d4575d1 KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998) d4575d1 is described below commit d4575d139c1289571369d6192a726a41af831166 Author: huxi AuthorDate: Thu Dec 6 21:46:35 2018 +0800 KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998) On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats set the wrong lag for fetcherLagStats due to `nextOffset` is zero. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 60d397d..797b0f5 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -272,10 +272,10 @@ abstract class AbstractFetcherThread(name: String, partitionData) logAppendInfoOpt.foreach { logAppendInfo => - val nextOffset = logAppendInfo.lastOffset + 1 + val validBytes = logAppendInfo.validBytes + val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset) - val validBytes = logAppendInfo.validBytes // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData
[kafka] branch trunk updated: KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 87cc31c KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998) 87cc31c is described below commit 87cc31c4e7ea36e7e832a1d02d71480a91a75293 Author: huxi AuthorDate: Thu Dec 6 21:46:35 2018 +0800 KAFKA-7704: MaxLag.Replica metric is reported incorrectly (#5998) On the follower side, for the empty `LogAppendInfo` retrieved from the leader, fetcherLagStats set the wrong lag for fetcherLagStats due to `nextOffset` is zero. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2cee83c..02158fa 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -272,10 +272,10 @@ abstract class AbstractFetcherThread(name: String, partitionData) logAppendInfoOpt.foreach { logAppendInfo => - val nextOffset = logAppendInfo.lastOffset + 1 + val validBytes = logAppendInfo.validBytes + val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset) - val validBytes = logAppendInfo.validBytes // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData
[kafka] branch trunk updated: KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8afce0e KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921) 8afce0e is described below commit 8afce0e338e3715824e309f6289af6bc607ce020 Author: Mayuresh Gharat AuthorDate: Sun Jan 13 10:17:52 2019 -0800 KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921) KIP-291 Implementation : Added code to separate controller connections and requests from the data plane. Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane. Also added unit tests in order to test the functionality. Author: Lucas Wang , Author: Mayuresh Gharat Reviewers: Joel Koshy , Jun Rao --- .../controller/ControllerChannelManager.scala | 8 +- .../main/scala/kafka/network/RequestChannel.scala | 13 +- .../main/scala/kafka/network/SocketServer.scala| 235 ++--- .../scala/kafka/server/DynamicBrokerConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 63 +- .../scala/kafka/server/KafkaRequestHandler.scala | 10 +- core/src/main/scala/kafka/server/KafkaServer.scala | 38 +++- .../kafka/api/AdminClientIntegrationTest.scala | 4 +- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +- .../integration/kafka/api/BaseQuotaTest.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 46 ++-- .../api/SaslSslAdminClientIntegrationTest.scala| 4 +- .../server/DynamicBrokerReconfigurationTest.scala | 6 +- .../test/scala/unit/kafka/admin/AdminTest.scala| 4 +- .../controller/ControllerIntegrationTest.scala | 45 +++- .../unit/kafka/network/SocketServerTest.scala | 218 ++- .../kafka/server/DynamicConfigChangeTest.scala | 4 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 25 +++ .../unit/kafka/server/MetadataRequestTest.scala| 8 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 4 +- .../scala/unit/kafka/zk/AdminZkClientTest.scala| 4 +- .../integration/utils/IntegrationTestUtils.java| 2 +- 23 files changed, 500 insertions(+), 257 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index b5c6a91..083e952 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") -val brokerNode = broker.node(config.interBrokerListenerName) +val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) +val brokerNode = broker.node(controllerToBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( -config.interBrokerSecurityProtocol, +controllerToBrokerSecurityProtocol, JaasContext.Type.SERVER, config, -config.interBrokerListenerName, +controllerToBrokerListenerName, config.saslMechanismInterBrokerProtocol, time, config.saslInterBrokerHandshakeRequestEnable diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 00b0968..988c14f 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -41,6 +41,7 @@ object RequestChannel extends Logging { val RequestQueueSizeMetric = "RequestQueueSize" val ResponseQueueSizeMetric = "ResponseQueueSize" + val ControlPlaneMetricPrefix = "ControlPlane" val ProcessorMetricTag = "processor" def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled @@ -272,17 +273,19 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, val metricNamePrefix : String
[kafka] branch trunk updated: KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 646ec94 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) 646ec94 is described below commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228 Author: Dhruvil Shah AuthorDate: Fri Jan 25 19:40:50 2019 -0800 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) Check if a partition is offline while iterating all partitions. Reviewers: Jun Rao --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5e41e35..955701a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -411,9 +411,13 @@ class ReplicaManager(val config: KafkaConfig, def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after + // the iterator has been constructed could still be returned by this iterator. private def nonOfflinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the + // iterator has been constructed may not be visible. private def offlinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) @@ -1356,7 +1360,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + +// Shrink ISRs for non offline partitions +allPartitions.keys.foreach { topicPartition => + nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) +} } /**
[kafka] branch trunk updated: KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (#6493)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 13e265a KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (#6493) 13e265a is described below commit 13e265ab3dfd69cdc3709b6f871418bcf1a2221f Author: Ivan Yurchenko AuthorDate: Tue Mar 26 03:50:12 2019 +0200 KAFKA-7986: Distinguish logging from different ZooKeeperClient instances (#6493) A broken can have more than one instance of ZooKeeperClient. For example, SimpleAclAuthorizer creates a separate ZooKeeperClient instance when configured. This commit makes it possible to optionally specify the name for the ZooKeeperClient instance. The name is specified only for a broker's ZooKeeperClient instances, but not for commands' and tests'. Reviewers: Jun Rao --- .../kafka/security/auth/SimpleAclAuthorizer.scala| 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +++-- .../main/scala/kafka/zookeeper/ZooKeeperClient.scala | 20 ++-- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 8a0b4a0..e39babf 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -98,7 +98,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val time = Time.SYSTEM zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs, - zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer") + zkMaxInFlightRequests, time, "kafka.security", "SimpleAclAuthorizer", name=Some("Simple ACL authorizer")) zkClient.createAclPaths() extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1 diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 8fc5197..b5ee8cc 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -359,7 +359,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP def createZkClient(zkConnect: String, isSecure: Boolean) = KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, -config.zkMaxInFlightRequests, time) +config.zkMaxInFlightRequests, time, name = Some("Kafka server")) val chrootIndex = config.zkConnect.indexOf("/") val chrootOption = { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6d8d504..782ec2a 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1820,9 +1820,10 @@ object KafkaZkClient { maxInFlightRequests: Int, time: Time, metricGroup: String = "kafka.server", -metricType: String = "SessionExpireListener") = { +metricType: String = "SessionExpireListener", +name: Option[String] = None) = { val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, - time, metricGroup, metricType) + time, metricGroup, metricType, name) new KafkaZkClient(zooKeeperClient, isSecure, time) } diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala index ad4da8b..c193ff2 100755 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala @@ -44,6 +44,7 @@ import scala.collection.mutable.Set * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. + * @param name name of the client instance */ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, @@ -51,8 +52,23 @@ class ZooKeeperClient(connectString: String, maxInFlightRequests: Int, time: Time, metricGroup: String, - metricType: String) extends Logging with KafkaMetricsGroup { - this.logIdent = "[ZooKeeperClient] " + metricType: String, + name: Option[String])
[kafka] branch trunk updated: KAFKA-7283: Enable lazy mmap on index files and skip sanity check for segments below recovery point (#5498)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2932d32 KAFKA-7283: Enable lazy mmap on index files and skip sanity check for segments below recovery point (#5498) 2932d32 is described below commit 2932d32afb293b2c230a6d73b2eb134ef2828c56 Author: Zhanxiang (Patrick) Huang AuthorDate: Wed Feb 20 21:50:31 2019 -0800 KAFKA-7283: Enable lazy mmap on index files and skip sanity check for segments below recovery point (#5498) Per the KIP-263 discussion, we think we can improve broker restart time by avoiding performing costly disk operations when sanity checking index files for segments below recovery point on broker startup. This PR includes the following changes: 1. Mmap the index file and populate fields of the index file on-demand rather than performing costly disk operations when creating the index object on broker startup. 2. Skip sanity checks on the time index and offset index of segments. 1. For segment with offset below the flushed point (recovery point), these segments are safely flushed so we don't need to sanity check the index files. if there are indeed data corruption on disk, given that we don't sanity check the segment file, sanity checking only the indexes adds little benefit. 2. For segment with offset above the flushed point (recovery point), we will recover these segments in `recoveryLog()` (Log.scala) in any case so sanity checking the index files for these segments is redundant. We did experiments on a cluster with 15 brokers, each of which has ~3k segments (and there are 31.8k partitions with RF=3 which are evenly distributed across brokers; total bytes-in-rate is around 400 MBps). The results show that rolling bounce time reduces from 135 minutes to 55 minutes. Reviewers: Ismael Juma , Jun Rao --- core/src/main/scala/kafka/log/LogSegment.scala | 72 ++ core/src/main/scala/kafka/log/OffsetIndex.scala| 34 ++ core/src/main/scala/kafka/log/TimeIndex.scala | 34 ++ .../test/scala/unit/kafka/log/LogManagerTest.scala | 11 +++- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 10 +-- core/src/test/scala/unit/kafka/log/LogTest.scala | 43 - core/src/test/scala/unit/kafka/log/LogUtils.scala | 4 +- 7 files changed, 157 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 13093b4..9168ce0 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -53,14 +53,18 @@ import scala.math._ */ @nonthreadsafe class LogSegment private[log] (val log: FileRecords, - val offsetIndex: OffsetIndex, - val timeIndex: TimeIndex, + val lazyOffsetIndex: LazyOffsetIndex, + val lazyTimeIndex: LazyTimeIndex, val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, val time: Time) extends Logging { + def offsetIndex: OffsetIndex = lazyOffsetIndex.get + + def timeIndex: TimeIndex = lazyTimeIndex.get + def shouldRoll(rollParams: RollParams): Boolean = { val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs size > rollParams.maxSegmentBytes - rollParams.messagesSize || @@ -74,15 +78,16 @@ class LogSegment private[log] (val log: FileRecords, } def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = { -if (offsetIndex.file.exists) { - offsetIndex.sanityCheck() +if (lazyOffsetIndex.file.exists) { // Resize the time index file to 0 if it is newly created. if (timeIndexFileNewlyCreated) timeIndex.resize(0) - timeIndex.sanityCheck() + // Sanity checks for time index and offset index are skipped because + // we will recover the segments above the recovery point in recoverLog() + // in any case so sanity checking them here is redundant. txnIndex.sanityCheck() } -else throw new NoSuchFileException(s"Offset index file ${offsetIndex.file.getAbsolutePath} does not exist") +else throw new NoSuchFileException(s"Offset index file ${lazyOffsetIndex.file.getAbsolutePath} does not exist") } private var created = time.milliseconds @@ -94,8 +99,21 @@ class LogSegment private[log] (val log: FileRecords, private var rollingBasedTimestamp: Option[Long] = None /* The maximum timestamp we
[kafka] branch trunk updated: KAFKA-7864; validate partitions are 0-based (#6246)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 217f45e KAFKA-7864; validate partitions are 0-based (#6246) 217f45e is described below commit 217f45ed554b34d5221e1dd3db76e4be892661cf Author: Ryan Chen AuthorDate: Fri Feb 22 09:11:21 2019 -0800 KAFKA-7864; validate partitions are 0-based (#6246) Reviewers: Sriharsha Chintalapani , Jun Rao --- core/src/main/scala/kafka/zk/AdminZkClient.scala | 6 ++ .../kafka/admin/PreferredReplicaElectionCommandTest.scala | 4 ++-- .../unit/kafka/controller/ControllerIntegrationTest.scala | 4 ++-- core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala | 15 +++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 14e7d7e..b10a089 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -125,6 +125,12 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment) ) +val partitionSize = partitionReplicaAssignment.size +val sequenceSum = partitionSize * (partitionSize - 1) / 2 +if (partitionReplicaAssignment.size != partitionReplicaAssignment.toSet.size || +partitionReplicaAssignment.keys.filter(_ >= 0).sum != sequenceSum) +throw new InvalidReplicaAssignmentException("partitions should be a consecutive 0-based integer sequence") + LogConfig.validate(config) } diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala index d03ec04..cf752b8 100644 --- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaElectionCommandTest.scala @@ -49,9 +49,9 @@ class PreferredReplicaElectionCommandTest extends ZooKeeperTestHarness with Logg @Test def testBasicPreferredReplicaElection() { -val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) +val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" -val partition = 1 +val partition = 0 val preferredReplica = 0 // create brokers val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2") diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 08747a8..f167876 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -385,9 +385,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { @Test def testControlledShutdown() { -val expectedReplicaAssignment = Map(1 -> List(0, 1, 2)) +val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topic = "test" -val partition = 1 +val partition = 0 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index c120caa..fa8635f 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -66,6 +66,21 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware adminZkClient.createTopicWithAssignment("test", topicConfig, Map(0->Seq(0,1), 1->Seq(0))) } +// partitions should be 0-based +intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(1->Seq(1,2), 2->Seq(1,2))) +} + +// partitions should be 0-based and consecutive +intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(0->Seq(1,2), 0->Seq(1,2), 3->Seq(1,2))) +} + +// partitions should be 0-based and consecutive +intercept[InvalidReplicaAssignmentException] { + adminZkClient.createTopicWithAssignment("test", topicConfig, Map(-1->Seq(1,2), 1->Seq(1,2), 2->Seq(1,2), 4->Seq(1,2))) +} + // good assignment val assignment = Map(0 -> List(0, 1, 2), 1 -> List(1, 2, 3))
[kafka] branch trunk updated: KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ef89cf4 KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168) ef89cf4 is described below commit ef89cf4eb687dbcca719acca09c98ded001d12dd Author: Dhruvil Shah AuthorDate: Fri Jan 25 14:11:31 2019 -0800 KAFKA-7838: Log leader and follower end offsets when shrinking ISR (#6168) Reviewers: Jun Rao --- core/src/main/scala/kafka/cluster/Partition.scala | 19 ++- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index ca3abbb..e73 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -634,16 +634,25 @@ class Partition(val topicPartition: TopicPartition, leaderReplicaIfLocal match { case Some(leaderReplica) => val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs) - if(outOfSyncReplicas.nonEmpty) { + if (outOfSyncReplicas.nonEmpty) { val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas assert(newInSyncReplicas.nonEmpty) -info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","), - newInSyncReplicas.map(_.brokerId).mkString(","))) +info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s." + .format(inSyncReplicas.map(_.brokerId).mkString(","), +newInSyncReplicas.map(_.brokerId).mkString(","), +leaderReplica.highWatermark.messageOffset, +leaderReplica.logEndOffset.messageOffset, +outOfSyncReplicas.map { replica => + s"(brokerId: ${replica.brokerId}, endOffset: ${replica.logEndOffset.messageOffset})" +}.mkString(" ") + ) +) + // update ISR in zk and in cache updateIsr(newInSyncReplicas) -// we may need to increment high watermark since ISR could be down to 1 - replicaManager.isrShrinkRate.mark() + +// we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) } else { false
[kafka] branch trunk updated: KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 269b652 KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848) 269b652 is described below commit 269b65279c746bc54c611141a5a6509f9b310f11 Author: Tom Bentley AuthorDate: Fri Jan 25 22:06:18 2019 + KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848) See also KIP-183. This implements the following algorithm: AdminClient sends ElectPreferredLeadersRequest. KafakApis receives ElectPreferredLeadersRequest and delegates to ReplicaManager.electPreferredLeaders() ReplicaManager delegates to KafkaController.electPreferredLeaders() KafkaController adds a PreferredReplicaLeaderElection to the EventManager, ReplicaManager.electPreferredLeaders()'s callback uses the delayedElectPreferredReplicasPurgatory to wait for the results of the election to appear in the metadata cache. If there are no results because of errors, or because the preferred leaders are already leading the partitions then a response is returned immediately. In the EventManager work thread the preferred leader is elected as follows: The EventManager runs PreferredReplicaLeaderElection.process() process() calls KafkaController.onPreferredReplicaElectionWithResults() KafkaController.onPreferredReplicaElectionWithResults() calls the PartitionStateMachine.handleStateChangesWithResults() to perform the election (asynchronously the PSM will send LeaderAndIsrRequest to the new and old leaders and UpdateMetadataRequest to all brokers) then invokes the callback. Reviewers: Colin P. McCabe , Jun Rao --- checkstyle/import-control.xml | 2 + .../org/apache/kafka/clients/NetworkClient.java| 6 +- .../apache/kafka/clients/admin/AdminClient.java| 52 .../admin/ElectPreferredLeadersOptions.java| 31 ++ .../clients/admin/ElectPreferredLeadersResult.java | 136 + .../kafka/clients/admin/KafkaAdminClient.java | 33 ++ .../PreferredLeaderNotAvailableException.java | 28 ++ .../org/apache/kafka/common/protocol/ApiKeys.java | 14 +- .../org/apache/kafka/common/protocol/Errors.java | 5 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java| 4 +- .../requests/ElectPreferredLeadersRequest.java | 129 .../requests/ElectPreferredLeadersResponse.java| 78 + .../message/ElectPreferredLeadersRequest.json | 33 ++ .../message/ElectPreferredLeadersResponse.json | 39 +++ .../src/main/resources/common/message/README.md| 2 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 54 .../kafka/clients/admin/MockAdminClient.java | 4 + .../kafka/common/requests/RequestContextTest.java | 3 +- .../kafka/common/requests/RequestResponseTest.java | 40 ++- .../PreferredReplicaLeaderElectionCommand.scala| 217 +++-- .../kafka/controller/ControllerEventManager.scala | 5 + .../scala/kafka/controller/KafkaController.scala | 159 -- .../kafka/controller/PartitionStateMachine.scala | 40 ++- .../kafka/server/DelayedElectPreferredLeader.scala | 89 ++ core/src/main/scala/kafka/server/KafkaApis.scala | 49 +++ .../main/scala/kafka/server/MetadataCache.scala| 6 + .../main/scala/kafka/server/ReplicaManager.scala | 37 +++ .../kafka/api/AdminClientIntegrationTest.scala | 203 - .../kafka/api/AuthorizerIntegrationTest.scala | 20 +- ...PreferredReplicaLeaderElectionCommandTest.scala | 337 + .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala| 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 8 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 11 + 35 files changed, 1798 insertions(+), 82 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a0bf740..0d316c5 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -116,6 +116,7 @@ + @@ -140,6 +141,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 144987e..3973701 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -676,7 +676,8 @@ public class NetworkClient implements KafkaClient { public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { Struct
svn commit: r34620 - in /release/kafka/2.3.0: ./ javadoc/ javadoc/org/ javadoc/org/apache/ javadoc/org/apache/kafka/ javadoc/org/apache/kafka/clients/ javadoc/org/apache/kafka/clients/admin/ javadoc/o
Author: junrao Date: Tue Jun 25 00:38:24 2019 New Revision: 34620 Log: release 2.3.0 [This commit notification would consist of 181 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.]
[kafka] branch trunk updated: Update KafkaConfig.scala (#7113)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cdea348 Update KafkaConfig.scala (#7113) cdea348 is described below commit cdea3480c693d270064ebcb3871d08dee053ce7f Author: Gemma Singleton <49679170+gemma-single...@users.noreply.github.com> AuthorDate: Wed Jul 31 18:06:55 2019 +0200 Update KafkaConfig.scala (#7113) Better clarify the auto leader rebalance config documentation to reflect additional leader.imbalance.per.broker.percentage config description. Reviewers: Stanislav Kozlovski , Jun Rao --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5223ebd..a2db653 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -688,7 +688,7 @@ object KafkaConfig { val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory" val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory" val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory" - val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals" + val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered." val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage." val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss"
[kafka] branch trunk updated (a8aedc8 -> e9c61f6)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from a8aedc8 KAFKA-8696: clean up Sum/Count/Total metrics (#7057) add e9c61f6 MINOR: Ensure in-memory metadata is removed before physical deletion of segment (#7106) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/Log.scala | 72 ++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 21 +++ 2 files changed, 64 insertions(+), 29 deletions(-)
[kafka] branch trunk updated (a28447a -> dd22b3f)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from a28447a MINOR: don't assign standby tasks with no logged state (#8147) add dd22b3f KAFKA-9498; Topic validation during the topic creation triggers unnecessary TopicChange events (#8062) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/admin/TopicCommand.scala | 2 +- .../scala/kafka/controller/KafkaController.scala | 4 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 +- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 23 - .../scala/kafka/zookeeper/ZooKeeperClient.scala| 6 +-- .../kafka/api/ConsumerTopicCreationTest.scala | 2 +- .../scala/unit/kafka/admin/TopicCommandTest.scala | 2 +- .../security/auth/SimpleAclAuthorizerTest.scala| 2 +- .../security/authorizer/AclAuthorizerTest.scala| 2 +- .../scala/unit/kafka/zk/AdminZkClientTest.scala| 2 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala| 57 -- .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 43 +--- .../integration/utils/EmbeddedKafkaCluster.java| 10 ++-- 14 files changed, 121 insertions(+), 40 deletions(-)
[kafka] branch trunk updated (ebcdcd9 -> 72122fc)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from ebcdcd9 KAFKA-8025: Fix flaky RocksDB test (#8126) add 72122fc KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset (#8089) No new revisions were added by this update. Summary of changes: .../main/scala/kafka/log/LogCleanerManager.scala | 41 +++--- .../unit/kafka/log/LogCleanerManagerTest.scala | 87 +++--- 2 files changed, 104 insertions(+), 24 deletions(-)
[kafka] branch 2.4 updated: KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset … (#8136)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new ce4aa9c KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset … (#8136) ce4aa9c is described below commit ce4aa9c0943436f77393609b8db986d5ffb7a8b6 Author: David Mao <47232755+sple...@users.noreply.github.com> AuthorDate: Thu Feb 20 18:14:26 2020 -0800 KAFKA-6266: Repeated occurrence of WARN Resetting first dirty offset … (#8136) Previously, checkpointed offsets for a log were only updated if the log was chosen for cleaning once the cleaning job completes. This caused issues in cases where logs with invalid checkpointed offsets would repeatedly emit warnings if the log with an invalid cleaning checkpoint wasn't chosen for cleaning. Proposed fix is to update the checkpointed offset for logs with invalid checkpoints regardless of whether it gets chosen for cleaning. Reviewers: Anna Povzner , Jun Rao --- .../main/scala/kafka/log/LogCleanerManager.scala | 83 + .../unit/kafka/log/LogCleanerManagerTest.scala | 87 +++--- 2 files changed, 125 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 32ad708..19f7e4d 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -97,32 +97,32 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /* gauges for tracking the number of uncleanable bytes from uncleanable partitions for each log directory */ -for (dir <- logDirs) { - newGauge( -"uncleanable-bytes", -new Gauge[Long] { - def value = { -inLock(lock) { - uncleanablePartitions.get(dir.getAbsolutePath) match { -case Some(partitions) => { - val lastClean = allCleanerCheckpoints - val now = Time.SYSTEM.milliseconds - partitions.map { tp => -val log = logs.get(tp) -val lastCleanOffset = lastClean.get(tp) -val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now) -val (_, uncleanableBytes) = calculateCleanableBytes(log, firstDirtyOffset, firstUncleanableDirtyOffset) -uncleanableBytes - }.sum -} -case _ => 0 + for (dir <- logDirs) { +newGauge( + "uncleanable-bytes", + new Gauge[Long] { +def value = { + inLock(lock) { +uncleanablePartitions.get(dir.getAbsolutePath) match { + case Some(partitions) => { +val lastClean = allCleanerCheckpoints +val now = Time.SYSTEM.milliseconds +partitions.map { tp => + val log = logs.get(tp) + val lastCleanOffset = lastClean.get(tp) + val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now) + val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset) + uncleanableBytes +}.sum } + case _ => 0 } } -}, -Map("logDirectory" -> dir.getAbsolutePath) - ) -} +} + }, + Map("logDirectory" -> dir.getAbsolutePath) +) + } /* a gauge for tracking the cleanable ratio of the dirtiest log */ @volatile private var dirtiestLogCleanableRatio = 0.0 @@ -187,11 +187,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], case (topicPartition, log) => // create a LogToClean instance for each try { val lastCleanOffset = lastClean.get(topicPartition) -val (firstDirtyOffset, firstUncleanableDirtyOffset) = cleanableOffsets(log, lastCleanOffset, now) -val compactionDelayMs = maxCompactionDelay(log, firstDirtyOffset, now) +val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now) +// update checkpoint for logs with invalid checkpointed offsets +if (offsetsToClean.forceUpdateCheckpoint) + updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset)) +val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now) preCleanStats.updateMaxCompactionDelay(compactionDelayMs) -LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset, compactionDelayMs &g
[kafka] branch trunk updated: [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b8c292c [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) b8c292c is described below commit b8c292c36187d3feda8ae0ce22d58604115b8507 Author: Steve Rodrigues <42848865+steve...@users.noreply.github.com> AuthorDate: Tue Apr 14 22:27:50 2020 -0700 [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages. This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment. This fix has two parts: It changes logSegments to handle start==end by returning an empty List always. It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow. These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning. Reviewer: Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 23 -- .../main/scala/kafka/log/LogCleanerManager.scala | 2 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 10 --- core/src/test/scala/unit/kafka/log/LogTest.scala | 35 ++ 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b1a0f05..a8148f6 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2124,17 +2124,22 @@ class Log(@volatile private var _dir: File, /** * Get all segments beginning with the segment that includes "from" and ending with the segment - * that includes up to "to-1" or the end of the log (if to > logEndOffset) + * that includes up to "to-1" or the end of the log (if to > logEndOffset). */ def logSegments(from: Long, to: Long): Iterable[LogSegment] = { -lock synchronized { - val view = Option(segments.floorKey(from)).map { floor => -if (to < floor) - throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + -s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to") -segments.subMap(floor, to) - }.getOrElse(segments.headMap(to)) - view.values.asScala +if (from == to) { + // Handle non-segment-aligned empty sets + List.empty[LogSegment] +} else if (to < from) { + throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + +s"from offset $from which is greater than limit offset $to") +} else { + lock synchronized { +val view = Option(segments.floorKey(from)).map { floor => + segments.subMap(floor, to) +}.getOrElse(segments.headMap(to)) +view.values.asScala + } } } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index f9ea46b..473f1fb 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -599,7 +599,7 @@ private[log] object LogCleanerManager extends Logging { def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment) val firstUncleanableOffset = firstUncle
[kafka] branch trunk updated: MINOR: reduce impact of trace logging in replica hot path (#8468)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 851b45c MINOR: reduce impact of trace logging in replica hot path (#8468) 851b45c is described below commit 851b45c842a9d35c8299f87725b52b1b6523271e Author: Lucas Bradstreet AuthorDate: Fri Apr 17 14:22:30 2020 -0700 MINOR: reduce impact of trace logging in replica hot path (#8468) The impact of trace logging is normally small, on the order of 40ns per getEffectiveLevel check, however this adds up with trace is called multiple times per partition in the replica fetch hot path. This PR removes some trace logs that are not very useful and reduces cases where the level is checked over and over for one fetch request. Reviewers: Ismael Juma , Jun Rao --- core/src/main/scala/kafka/cluster/Replica.scala | 2 -- .../scala/kafka/server/ReplicaFetcherThread.scala| 7 --- .../src/main/scala/kafka/server/ReplicaManager.scala | 20 +--- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index f9de7ba..d1fc345 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -83,7 +83,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log lastFetchLeaderLogEndOffset = leaderEndOffset lastFetchTimeMs = followerFetchTimeMs updateLastSentHighWatermark(lastSentHighwatermark) -trace(s"Updated state of replica to $this") } /** @@ -96,7 +95,6 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log */ private def updateLastSentHighWatermark(highWatermark: Long): Unit = { _lastSentHighWatermark = highWatermark -trace(s"Updated HW of replica to $highWatermark") } def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index a42f689..ec6d75c 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -149,6 +149,7 @@ class ReplicaFetcherThread(name: String, override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { +val logTrace = isTraceEnabled val partition = replicaMgr.nonOfflinePartition(topicPartition).get val log = partition.localLogOrException val records = toMemoryRecords(partitionData.records) @@ -159,14 +160,14 @@ class ReplicaFetcherThread(name: String, throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, log.logEndOffset)) -if (isTraceEnabled) +if (logTrace) trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // Append the leader's messages to the log val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) -if (isTraceEnabled) +if (logTrace) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" .format(log.logEndOffset, records.sizeInBytes, topicPartition)) val leaderLogStartOffset = partitionData.logStartOffset @@ -175,7 +176,7 @@ class ReplicaFetcherThread(name: String, // These values will be computed upon becoming leader or handling a preferred read replica fetch. val followerHighWatermark = log.updateHighWatermark(partitionData.highWatermark) log.maybeIncrementLogStartOffset(leaderLogStartOffset) -if (isTraceEnabled) +if (logTrace) trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark") // Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9659172..4d917d5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -842,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig, origin: AppendOrigin,
[kafka] branch trunk updated: MINOR: Adding github whitelist (#8523)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dff2192 MINOR: Adding github whitelist (#8523) dff2192 is described below commit dff2192cfaf747d56d821645a8d45e8ac8ac08d5 Author: Richard Yu AuthorDate: Mon Apr 20 14:42:59 2020 -0700 MINOR: Adding github whitelist (#8523) Reviewer: Jun Rao --- .asf.yaml | 4 1 file changed, 4 insertions(+) diff --git a/.asf.yaml b/.asf.yaml index ae0dede..0a13850 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -20,3 +20,7 @@ notifications: issues: j...@kafka.apache.org pullrequests: j...@kafka.apache.org jira_options: link label + +jenkins: + github_whitelist: +- ConcurrencyPractitioner
[kafka] branch trunk updated: KAFKA-3720: Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms (#8399)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e032a36 KAFKA-3720: Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms (#8399) e032a36 is described below commit e032a360708cec2284f714e4cae388066064d61c Author: Sönke Liebau AuthorDate: Sat Apr 11 01:48:29 2020 +0200 KAFKA-3720: Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms (#8399) Change TimeoutException to BufferExhaustedException when no memory can be allocated for a record within max.block.ms Refactored BufferExhaustedException to be a subclass of TimeoutException so existing code that catches TimeoutExceptions keeps working. Added handling to count these Exceptions in the metric "buffer-exhausted-records". Test Strategy There were existing test cases to check this behavior, which I refactored. I then added an extra case to check whether the expected Exception is actually thrown, which was not covered by current tests. Reviewers: Ismael Juma , Jun Rao --- .../clients/producer/BufferExhaustedException.java | 12 ++ .../kafka/clients/producer/KafkaProducer.java | 5 - .../clients/producer/internals/BufferPool.java | 11 +++-- .../producer/internals/RecordAccumulator.java | 7 -- .../clients/producer/internals/BufferPoolTest.java | 26 +++--- .../kafka/api/PlaintextProducerSendTest.scala | 13 +++ 6 files changed, 44 insertions(+), 30 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java index be840db..292bb4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java @@ -16,13 +16,17 @@ */ package org.apache.kafka.clients.producer; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.TimeoutException; /** - * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at - * which data can be sent for long enough for the allocated buffer to be exhausted. + * This exception is thrown if the producer cannot allocate memory for a record within max.block.ms due to the buffer + * being too full. + * + * In earlier versions a TimeoutException was thrown instead of this. To keep existing catch-clauses working + * this class extends TimeoutException. + * */ -public class BufferExhaustedException extends KafkaException { +public class BufferExhaustedException extends TimeoutException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9c35e78..deecfd8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -966,11 +966,6 @@ public class KafkaProducer implements Producer { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); -} catch (BufferExhaustedException e) { -this.errors.record(); -this.metrics.sensor("buffer-exhausted-records").record(); -this.interceptors.onSendError(record, tp, e); -throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java index b49a7e2..ee84c7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java @@ -23,9 +23,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Meter; @@ -83,6 +83,12 @@ public class BufferPool {
[kafka] branch trunk updated (008a3b2 -> c59835c)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 008a3b2 KAFKA-9707: Fix InsertField.Key should apply to keys of tombstone records (#8280) add c59835c KAFKA-8470: State change logs should not be in TRACE level (#8320) No new revisions were added by this update. Summary of changes: config/log4j.properties| 2 +- core/src/main/scala/kafka/cluster/Partition.scala | 23 +++-- .../controller/ControllerChannelManager.scala | 35 --- .../scala/kafka/controller/KafkaController.scala | 2 +- .../kafka/controller/PartitionStateMachine.scala | 13 ++- .../kafka/controller/ReplicaStateMachine.scala | 36 --- .../main/scala/kafka/server/MetadataCache.scala| 36 --- .../main/scala/kafka/server/ReplicaManager.scala | 107 ++--- 8 files changed, 146 insertions(+), 108 deletions(-)
[kafka] branch trunk updated: Minor: remove redundant check in auto preferred leader election (#8566)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 77ac06f Minor: remove redundant check in auto preferred leader election (#8566) 77ac06f is described below commit 77ac06f3f1921d118b9854ee7b8a4d3b58e832a4 Author: Leonard Ge <62600326+leonar...@users.noreply.github.com> AuthorDate: Tue Apr 28 17:12:00 2020 +0100 Minor: remove redundant check in auto preferred leader election (#8566) This is a minor follower up PR of #8524 Reviewer: Jun Rao --- core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 82be66a..c6916a2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1065,7 +1065,7 @@ class KafkaController(val config: KafkaConfig, if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress -val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && +val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && controllerContext.allTopics.contains(tp.topic) &&
[kafka] branch trunk updated (2bc87de -> 77ac06f)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 2bc87de MINOR: Update the link to the Raft paper in docs (#8560) add 77ac06f Minor: remove redundant check in auto preferred leader election (#8566) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/controller/KafkaController.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[kafka] branch trunk updated: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new db9e55a KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524) db9e55a is described below commit db9e55a50f93d82e4aad5e4f82a13fac0e93759e Author: Leonard Ge <62600326+leonar...@users.noreply.github.com> AuthorDate: Mon Apr 27 21:51:10 2020 +0100 KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524) In this commit we made sure that the auto leader election only happens after the newly starter broker is in the isr. No accompany tests are added due to the fact that: this is a change to the private method and no public facing change is made it is hard to create tests for this change without considerable effort Reviewers: Stanislav Kozlovski , Jun Rao --- core/src/main/scala/kafka/controller/KafkaController.scala | 13 - .../unit/kafka/controller/ControllerIntegrationTest.scala | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 9ad7b6f..82be66a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig, val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && - controllerContext.allTopics.contains(tp.topic)) + controllerContext.allTopics.contains(tp.topic) && + canPreferredReplicaBeLeader(tp) + ) onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) } } } + private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = { +val assignment = controllerContext.partitionReplicaAssignment(tp) +val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp)) +val isr = controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr +PartitionLeaderElectionAlgorithms + .preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet) + .nonEmpty + } + private def processAutoPreferredReplicaLeaderElection(): Unit = { if (!isActive) return try { diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index c4b5f47..c7a1cd5 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -433,8 +433,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") -servers(1).shutdown() -servers(1).awaitShutdown() +servers(otherBrokerId).shutdown() +servers(otherBrokerId).awaitShutdown() TestUtils.waitUntilTrue(() => { val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) leaderIsrAndControllerEpochMap.contains(tp) &&
[kafka] branch 2.4 updated: [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.4 by this push: new 9d447be [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543) 9d447be is described below commit 9d447be1ef8399bb162944bd0ded02384162f41f Author: Steve Rodrigues <42848865+steve...@users.noreply.github.com> AuthorDate: Wed Apr 29 11:10:12 2020 -0700 [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8543) In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages. This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment. This fix has two parts: It changes logSegments to handle start==end by returning an empty List always. It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow. These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning. Reviewer: Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 23 -- .../main/scala/kafka/log/LogCleanerManager.scala | 2 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 10 --- core/src/test/scala/unit/kafka/log/LogTest.scala | 35 ++ 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 91bb1eb..401cb2c 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2140,17 +2140,22 @@ class Log(@volatile var dir: File, /** * Get all segments beginning with the segment that includes "from" and ending with the segment - * that includes up to "to-1" or the end of the log (if to > logEndOffset) + * that includes up to "to-1" or the end of the log (if to > logEndOffset). */ def logSegments(from: Long, to: Long): Iterable[LogSegment] = { -lock synchronized { - val view = Option(segments.floorKey(from)).map { floor => -if (to < floor) - throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + -s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to") -segments.subMap(floor, to) - }.getOrElse(segments.headMap(to)) - view.values.asScala +if (from == to) { + // Handle non-segment-aligned empty sets + List.empty[LogSegment] +} else if (to < from) { + throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + +s"from offset $from which is greater than limit offset $to") +} else { + lock synchronized { +val view = Option(segments.floorKey(from)).map { floor => + segments.subMap(floor, to) +}.getOrElse(segments.headMap(to)) +view.values.asScala + } } } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 19f7e4d..04a97fd 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -607,7 +607,7 @@ private[log] object LogCleanerManager extends Logging { def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment) val firstUncleanableOffset = firstUncle
[kafka] branch 2.5 updated: [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8542)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.5 by this push: new 6308ffb [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8542) 6308ffb is described below commit 6308ffb017bb035731b49a24617db94a3f71b2f7 Author: Steve Rodrigues <42848865+steve...@users.noreply.github.com> AuthorDate: Wed Apr 29 15:15:57 2020 -0700 [KAFKA-9826] Handle an unaligned first dirty offset during log cleaning. (#8469) (#8542) In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages. This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment. This fix has two parts: It changes logSegments to handle start==end by returning an empty List always. It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow. These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning. Reviewer: Jun Rao --- core/src/main/scala/kafka/log/Log.scala| 23 -- .../main/scala/kafka/log/LogCleanerManager.scala | 2 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 10 --- core/src/test/scala/unit/kafka/log/LogTest.scala | 35 ++ 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a6b335a..36f4859 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2120,17 +2120,22 @@ class Log(@volatile var dir: File, /** * Get all segments beginning with the segment that includes "from" and ending with the segment - * that includes up to "to-1" or the end of the log (if to > logEndOffset) + * that includes up to "to-1" or the end of the log (if to > logEndOffset). */ def logSegments(from: Long, to: Long): Iterable[LogSegment] = { -lock synchronized { - val view = Option(segments.floorKey(from)).map { floor => -if (to < floor) - throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + -s"from offset $from mapping to segment with base offset $floor, which is greater than limit offset $to") -segments.subMap(floor, to) - }.getOrElse(segments.headMap(to)) - view.values.asScala +if (from == to) { + // Handle non-segment-aligned empty sets + List.empty[LogSegment] +} else if (to < from) { + throw new IllegalArgumentException(s"Invalid log segment range: requested segments in $topicPartition " + +s"from offset $from which is greater than limit offset $to") +} else { + lock synchronized { +val view = Option(segments.floorKey(from)).map { floor => + segments.subMap(floor, to) +}.getOrElse(segments.headMap(to)) +view.values.asScala + } } } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 4f75b99..ab69c4b 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -582,7 +582,7 @@ private[log] object LogCleanerManager extends Logging { def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = { val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment) val firstUncleanableOffset = firstUncle
[kafka] branch trunk updated (0a50973 -> bd42734)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 0a50973 KAFKA-9864: Avoid expensive QuotaViolationException usage (#8477) add bd42734 MINOR: Serialize state change logs for handling LeaderAndIsr and StopReplica requests (#8493) No new revisions were added by this update. Summary of changes: .../main/scala/kafka/server/ReplicaManager.scala | 40 +++--- 1 file changed, 20 insertions(+), 20 deletions(-)
[kafka] branch trunk updated: KAFKA-9711 The authentication failure caused by SSLEngine#beginHandshake is not properly caught and handled (#8287)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b8e508c KAFKA-9711 The authentication failure caused by SSLEngine#beginHandshake is not properly caught and handled (#8287) b8e508c is described below commit b8e508c823517c857c9cfa8a3de77ab374549fde Author: Chia-Ping Tsai AuthorDate: Wed Mar 25 09:45:05 2020 +0800 KAFKA-9711 The authentication failure caused by SSLEngine#beginHandshake is not properly caught and handled (#8287) SSLEngine#beginHandshake is possible to throw authentication failures (for example, no suitable cipher suites) so we ought to catch SSLException and then convert it to SslAuthenticationException so as to process authentication failures correctly. Reviewers: Jun Rao --- .../kafka/common/network/SslTransportLayer.java| 9 ++-- .../common/network/SslTransportLayerTest.java | 25 -- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 6f0e440..0b11cd1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -267,8 +267,13 @@ public class SslTransportLayer implements TransportLayer { */ @Override public void handshake() throws IOException { -if (state == State.NOT_INITALIZED) -startHandshake(); +if (state == State.NOT_INITALIZED) { +try { +startHandshake(); +} catch (SSLException e) { +maybeProcessHandshakeFailure(e, false, null); +} +} if (ready()) throw renegotiationException(); if (state == State.CLOSING) diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index a494d50..f5fedc8 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.Java; import org.apache.kafka.common.utils.LogContext; @@ -40,6 +40,7 @@ import org.junit.runners.Parameterized; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocketFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; @@ -52,8 +53,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -578,6 +579,26 @@ public class SslTransportLayerTest { server.verifyAuthenticationMetrics(1, 2); } +@Test +public void testUnsupportedCipher() throws Exception { +String[] cipherSuites = ((SSLServerSocketFactory) SSLServerSocketFactory.getDefault()).getSupportedCipherSuites(); +if (cipherSuites != null && cipherSuites.length > 1) { +sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); +sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[0])); +sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); +sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[1])); + +server = createEchoServer(SecurityProtocol.SSL); +createSelector(sslClientConfigs); + +checkAuthentiationFailed("1", "TLSv1.1"); +server.verifyAuthenticationMetrics(0, 1); + +checkAuthentiationFailed("2", "TLSv1"); +server.verifyAuthenticationMetrics(0, 2); +} +} + /** Checks connection failed using the specified {@code tlsVersion}. */ private void checkAuthentiationFailed(
[kafka] branch trunk updated (86013dc -> 6bba661)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 86013dc MINOR: add ImplicitLinkedHashCollection#moveToEnd (#9269) add 6bba661 MINOR: remove DelayedOperations.checkAndCompleteFetch (#9278) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/cluster/Partition.scala | 4 - .../scala/unit/kafka/cluster/PartitionTest.scala | 128 ++--- 2 files changed, 7 insertions(+), 125 deletions(-)
[kafka] branch trunk updated (86013dc -> 6bba661)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 86013dc MINOR: add ImplicitLinkedHashCollection#moveToEnd (#9269) add 6bba661 MINOR: remove DelayedOperations.checkAndCompleteFetch (#9278) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/cluster/Partition.scala | 4 - .../scala/unit/kafka/cluster/PartitionTest.scala | 128 ++--- 2 files changed, 7 insertions(+), 125 deletions(-)
[kafka] branch trunk updated (dd2b9ec -> c2273ad)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from dd2b9ec KAFKA-5636: Improve handling of "early" records in sliding windows (#9157) add c2273ad KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/cluster/Partition.scala | 22 +--- .../kafka/coordinator/group/DelayedJoin.scala | 11 +- .../coordinator/group/GroupMetadataManager.scala | 2 +- core/src/main/scala/kafka/log/Log.scala| 13 ++- core/src/main/scala/kafka/server/ActionQueue.scala | 56 ++ .../main/scala/kafka/server/DelayedOperation.scala | 118 + core/src/main/scala/kafka/server/KafkaApis.scala | 4 + .../main/scala/kafka/server/ReplicaManager.scala | 31 ++ .../AbstractCoordinatorConcurrencyTest.scala | 10 +- .../group/GroupCoordinatorConcurrencyTest.scala| 65 .../TransactionCoordinatorConcurrencyTest.scala| 5 +- .../unit/kafka/server/DelayedOperationTest.scala | 96 +++-- 12 files changed, 257 insertions(+), 176 deletions(-) create mode 100644 core/src/main/scala/kafka/server/ActionQueue.scala
[kafka] branch trunk updated: KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c2273ad KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657) c2273ad is described below commit c2273adc25b2bab0a3ac95bf7844fedf2860b40b Author: Chia-Ping Tsai AuthorDate: Thu Sep 10 05:42:37 2020 +0800 KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657) The main changes of this PR are shown below. 1. replace tryLock by lock for DelayedOperation#maybeTryComplete 2. complete the delayed requests without holding group lock Reviewers: Ismael Juma , Jun Rao --- core/src/main/scala/kafka/cluster/Partition.scala | 22 +--- .../kafka/coordinator/group/DelayedJoin.scala | 11 +- .../coordinator/group/GroupMetadataManager.scala | 2 +- core/src/main/scala/kafka/log/Log.scala| 13 ++- core/src/main/scala/kafka/server/ActionQueue.scala | 56 ++ .../main/scala/kafka/server/DelayedOperation.scala | 118 + core/src/main/scala/kafka/server/KafkaApis.scala | 4 + .../main/scala/kafka/server/ReplicaManager.scala | 31 ++ .../AbstractCoordinatorConcurrencyTest.scala | 10 +- .../group/GroupCoordinatorConcurrencyTest.scala| 65 .../TransactionCoordinatorConcurrencyTest.scala| 5 +- .../unit/kafka/server/DelayedOperationTest.scala | 96 +++-- 12 files changed, 257 insertions(+), 176 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index fb0576e..fc0852f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -103,19 +103,7 @@ class DelayedOperations(topicPartition: TopicPartition, fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition)) } - def checkAndCompleteProduce(): Unit = { -produce.checkAndComplete(TopicPartitionOperationKey(topicPartition)) - } - - def checkAndCompleteDeleteRecords(): Unit = { -deleteRecords.checkAndComplete(TopicPartitionOperationKey(topicPartition)) - } - def numDelayedDelete: Int = deleteRecords.numDelayed - - def numDelayedFetch: Int = fetch.numDelayed - - def numDelayedProduce: Int = produce.numDelayed } object Partition extends KafkaMetricsGroup { @@ -1010,15 +998,7 @@ class Partition(val topicPartition: TopicPartition, } } -// some delayed operations may be unblocked after HW changed -if (leaderHWIncremented) - tryCompleteDelayedRequests() -else { - // probably unblock some follower fetch requests since log end offset has been updated - delayedOperations.checkAndCompleteFetch() -} - -info +info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same) } def readRecords(fetchOffset: Long, diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala index dad2b1e..92e8835 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala @@ -36,8 +36,15 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator, rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _) - override def onExpiration() = coordinator.onExpireJoin() - override def onComplete() = coordinator.onCompleteJoin(group) + override def onExpiration(): Unit = { +coordinator.onExpireJoin() +// try to complete delayed actions introduced by coordinator.onCompleteJoin +tryToCompleteDelayedAction() + } + override def onComplete(): Unit = coordinator.onCompleteJoin(group) + + // TODO: remove this ugly chain after we move the action queue to handler thread + private def tryToCompleteDelayedAction(): Unit = coordinator.groupManager.replicaManager.tryCompleteActions() } /** diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 1fcdd91..9d58b2d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -56,7 +56,7 @@ import scala.jdk.CollectionConverters._ class GroupMetadataManager(brokerId: Int, interBrokerProtocolVersion: ApiVersion, config: OffsetConfig, - replicaManager: ReplicaManager, + val replicaManager: ReplicaManager
[kafka] branch trunk updated (dd2b9ec -> c2273ad)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from dd2b9ec KAFKA-5636: Improve handling of "early" records in sliding windows (#9157) add c2273ad KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/cluster/Partition.scala | 22 +--- .../kafka/coordinator/group/DelayedJoin.scala | 11 +- .../coordinator/group/GroupMetadataManager.scala | 2 +- core/src/main/scala/kafka/log/Log.scala| 13 ++- core/src/main/scala/kafka/server/ActionQueue.scala | 56 ++ .../main/scala/kafka/server/DelayedOperation.scala | 118 + core/src/main/scala/kafka/server/KafkaApis.scala | 4 + .../main/scala/kafka/server/ReplicaManager.scala | 31 ++ .../AbstractCoordinatorConcurrencyTest.scala | 10 +- .../group/GroupCoordinatorConcurrencyTest.scala| 65 .../TransactionCoordinatorConcurrencyTest.scala| 5 +- .../unit/kafka/server/DelayedOperationTest.scala | 96 +++-- 12 files changed, 257 insertions(+), 176 deletions(-) create mode 100644 core/src/main/scala/kafka/server/ActionQueue.scala
[kafka] branch trunk updated (77f6175 -> 77a0bba)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 77f6175 update source link in interactive query page (#9261) add 77a0bba KAFKA-8362: fix the old checkpoint won't be removed after alter log dir (#9178) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++- .../main/scala/kafka/log/LogCleanerManager.scala | 51 +-- core/src/main/scala/kafka/log/LogManager.scala | 11 ++- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../checkpoints/LeaderEpochCheckpointFile.scala| 12 ++- .../server/checkpoints/OffsetCheckpointFile.scala | 12 ++- .../unit/kafka/log/LogCleanerManagerTest.scala | 100 - .../LogCleanerParameterizedIntegrationTest.scala | 2 +- .../unit/kafka/server/LogDirFailureTest.scala | 2 +- 9 files changed, 186 insertions(+), 26 deletions(-)
[kafka] branch trunk updated (77f6175 -> 77a0bba)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 77f6175 update source link in interactive query page (#9261) add 77a0bba KAFKA-8362: fix the old checkpoint won't be removed after alter log dir (#9178) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++- .../main/scala/kafka/log/LogCleanerManager.scala | 51 +-- core/src/main/scala/kafka/log/LogManager.scala | 11 ++- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../checkpoints/LeaderEpochCheckpointFile.scala| 12 ++- .../server/checkpoints/OffsetCheckpointFile.scala | 12 ++- .../unit/kafka/log/LogCleanerManagerTest.scala | 100 - .../LogCleanerParameterizedIntegrationTest.scala | 2 +- .../unit/kafka/server/LogDirFailureTest.scala | 2 +- 9 files changed, 186 insertions(+), 26 deletions(-)
[kafka] branch trunk updated (77f6175 -> 77a0bba)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 77f6175 update source link in interactive query page (#9261) add 77a0bba KAFKA-8362: fix the old checkpoint won't be removed after alter log dir (#9178) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++- .../main/scala/kafka/log/LogCleanerManager.scala | 51 +-- core/src/main/scala/kafka/log/LogManager.scala | 11 ++- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../checkpoints/LeaderEpochCheckpointFile.scala| 12 ++- .../server/checkpoints/OffsetCheckpointFile.scala | 12 ++- .../unit/kafka/log/LogCleanerManagerTest.scala | 100 - .../LogCleanerParameterizedIntegrationTest.scala | 2 +- .../unit/kafka/server/LogDirFailureTest.scala | 2 +- 9 files changed, 186 insertions(+), 26 deletions(-)
[kafka] branch trunk updated (77f6175 -> 77a0bba)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 77f6175 update source link in interactive query page (#9261) add 77a0bba KAFKA-8362: fix the old checkpoint won't be removed after alter log dir (#9178) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/LogCleaner.scala | 14 ++- .../main/scala/kafka/log/LogCleanerManager.scala | 51 +-- core/src/main/scala/kafka/log/LogManager.scala | 11 ++- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- .../checkpoints/LeaderEpochCheckpointFile.scala| 12 ++- .../server/checkpoints/OffsetCheckpointFile.scala | 12 ++- .../unit/kafka/log/LogCleanerManagerTest.scala | 100 - .../LogCleanerParameterizedIntegrationTest.scala | 2 +- .../unit/kafka/server/LogDirFailureTest.scala | 2 +- 9 files changed, 186 insertions(+), 26 deletions(-)
[kafka] branch trunk updated (4e65030 -> fb4f297)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 4e65030 KAFKA-10402: Upgrade system tests to python3 (#9196) add fb4f297 KAFKA-10028: Implement write path for feature versioning system (KIP-584) (#9001) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/clients/admin/Admin.java | 68 +++ ...asOptions.java => DescribeFeaturesOptions.java} | 28 +- ...irsOptions.java => DescribeFeaturesResult.java} | 20 +- .../kafka/clients/admin/FeatureMetadata.java | 111 .../apache/kafka/clients/admin/FeatureUpdate.java | 78 +++ .../kafka/clients/admin/FinalizedVersionRange.java | 84 +++ .../kafka/clients/admin/KafkaAdminClient.java | 152 ++ .../kafka/clients/admin/SupportedVersionRange.java | 82 +++ ...ordsOptions.java => UpdateFeaturesOptions.java} | 11 +- ...TopicsResult.java => UpdateFeaturesResult.java} | 25 +- ...tion.java => FeatureUpdateFailedException.java} | 8 +- .../kafka/common/feature/BaseVersionRange.java | 21 +- .../common/feature/FinalizedVersionRange.java | 4 +- .../common/feature/SupportedVersionRange.java | 10 +- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../org/apache/kafka/common/protocol/Errors.java | 4 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java| 2 + .../kafka/common/requests/ApiVersionsResponse.java | 58 +- .../common/requests/UpdateFeaturesRequest.java | 95 .../common/requests/UpdateFeaturesResponse.java| 109 .../common/message/ApiVersionsResponse.json| 4 +- .../common/message/UpdateFeaturesRequest.json | 35 ++ ...lsResponse.json => UpdateFeaturesResponse.json} | 20 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 236 + .../kafka/clients/admin/MockAdminClient.java | 10 + .../scala/kafka/controller/ControllerState.scala | 7 +- .../scala/kafka/controller/KafkaController.scala | 434 ++- .../main/scala/kafka/server/BrokerFeatures.scala | 116 .../scala/kafka/server/FinalizedFeatureCache.scala | 91 +++- .../server/FinalizedFeatureChangeListener.scala| 16 +- core/src/main/scala/kafka/server/KafkaApis.scala | 56 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 18 +- .../scala/kafka/server/SupportedFeatures.scala | 93 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +- core/src/main/scala/kafka/zk/ZkData.scala | 41 +- .../controller/ControllerIntegrationTest.scala | 104 +++- .../unit/kafka/server/BrokerFeaturesTest.scala | 106 .../kafka/server/FinalizedFeatureCacheTest.scala | 78 +-- .../FinalizedFeatureChangeListenerTest.scala | 144 +++-- .../scala/unit/kafka/server/KafkaApisTest.scala| 7 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 3 + .../unit/kafka/server/SupportedFeaturesTest.scala | 56 -- .../unit/kafka/server/UpdateFeaturesTest.scala | 581 + .../jmh/metadata/MetadataRequestBenchmark.java | 7 +- 46 files changed, 2831 insertions(+), 415 deletions(-) copy clients/src/main/java/org/apache/kafka/clients/admin/{AlterClientQuotasOptions.java => DescribeFeaturesOptions.java} (53%) copy clients/src/main/java/org/apache/kafka/clients/admin/{DescribeLogDirsOptions.java => DescribeFeaturesResult.java} (69%) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java copy clients/src/main/java/org/apache/kafka/clients/admin/{DeleteRecordsOptions.java => UpdateFeaturesOptions.java} (81%) copy clients/src/main/java/org/apache/kafka/clients/admin/{DeleteTopicsResult.java => UpdateFeaturesResult.java} (70%) copy clients/src/main/java/org/apache/kafka/common/errors/{BrokerNotAvailableException.java => FeatureUpdateFailedException.java} (81%) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java create mode 100644 clients/src/main/resources/common/message/UpdateFeaturesRequest.json copy clients/src/main/resources/common/message/{AlterUserScramCredentialsResponse.json => UpdateFeaturesResponse.json} (60%) create mode 100644 core/src/main/scala/kafka/server/BrokerFeatures.scala delete mode 100644 core/src/main/scala/kafka/server/SupportedFeatures.scala create mode 10
[kafka] branch trunk updated (4e65030 -> fb4f297)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 4e65030 KAFKA-10402: Upgrade system tests to python3 (#9196) add fb4f297 KAFKA-10028: Implement write path for feature versioning system (KIP-584) (#9001) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/clients/admin/Admin.java | 68 +++ ...asOptions.java => DescribeFeaturesOptions.java} | 28 +- ...irsOptions.java => DescribeFeaturesResult.java} | 20 +- .../kafka/clients/admin/FeatureMetadata.java | 111 .../apache/kafka/clients/admin/FeatureUpdate.java | 78 +++ .../kafka/clients/admin/FinalizedVersionRange.java | 84 +++ .../kafka/clients/admin/KafkaAdminClient.java | 152 ++ .../kafka/clients/admin/SupportedVersionRange.java | 82 +++ ...ordsOptions.java => UpdateFeaturesOptions.java} | 11 +- ...TopicsResult.java => UpdateFeaturesResult.java} | 25 +- ...tion.java => FeatureUpdateFailedException.java} | 8 +- .../kafka/common/feature/BaseVersionRange.java | 21 +- .../common/feature/FinalizedVersionRange.java | 4 +- .../common/feature/SupportedVersionRange.java | 10 +- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../org/apache/kafka/common/protocol/Errors.java | 4 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java| 2 + .../kafka/common/requests/ApiVersionsResponse.java | 58 +- .../common/requests/UpdateFeaturesRequest.java | 95 .../common/requests/UpdateFeaturesResponse.java| 109 .../common/message/ApiVersionsResponse.json| 4 +- .../common/message/UpdateFeaturesRequest.json | 35 ++ ...lsResponse.json => UpdateFeaturesResponse.json} | 20 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 236 + .../kafka/clients/admin/MockAdminClient.java | 10 + .../scala/kafka/controller/ControllerState.scala | 7 +- .../scala/kafka/controller/KafkaController.scala | 434 ++- .../main/scala/kafka/server/BrokerFeatures.scala | 116 .../scala/kafka/server/FinalizedFeatureCache.scala | 91 +++- .../server/FinalizedFeatureChangeListener.scala| 16 +- core/src/main/scala/kafka/server/KafkaApis.scala | 56 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 18 +- .../scala/kafka/server/SupportedFeatures.scala | 93 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +- core/src/main/scala/kafka/zk/ZkData.scala | 41 +- .../controller/ControllerIntegrationTest.scala | 104 +++- .../unit/kafka/server/BrokerFeaturesTest.scala | 106 .../kafka/server/FinalizedFeatureCacheTest.scala | 78 +-- .../FinalizedFeatureChangeListenerTest.scala | 144 +++-- .../scala/unit/kafka/server/KafkaApisTest.scala| 7 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 3 + .../unit/kafka/server/SupportedFeaturesTest.scala | 56 -- .../unit/kafka/server/UpdateFeaturesTest.scala | 581 + .../jmh/metadata/MetadataRequestBenchmark.java | 7 +- 46 files changed, 2831 insertions(+), 415 deletions(-) copy clients/src/main/java/org/apache/kafka/clients/admin/{AlterClientQuotasOptions.java => DescribeFeaturesOptions.java} (53%) copy clients/src/main/java/org/apache/kafka/clients/admin/{DescribeLogDirsOptions.java => DescribeFeaturesResult.java} (69%) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java copy clients/src/main/java/org/apache/kafka/clients/admin/{DeleteRecordsOptions.java => UpdateFeaturesOptions.java} (81%) copy clients/src/main/java/org/apache/kafka/clients/admin/{DeleteTopicsResult.java => UpdateFeaturesResult.java} (70%) copy clients/src/main/java/org/apache/kafka/common/errors/{BrokerNotAvailableException.java => FeatureUpdateFailedException.java} (81%) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java create mode 100644 clients/src/main/resources/common/message/UpdateFeaturesRequest.json copy clients/src/main/resources/common/message/{AlterUserScramCredentialsResponse.json => UpdateFeaturesResponse.json} (60%) create mode 100644 core/src/main/scala/kafka/server/BrokerFeatures.scala delete mode 100644 core/src/main/scala/kafka/server/SupportedFeatures.scala create mode 10
[kafka] branch 2.7 updated: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new ba24e6e KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393) ba24e6e is described below commit ba24e6ec4d730838e49df0f6a02aa3210322f9c7 Author: Kowshik Prakasam AuthorDate: Thu Oct 8 10:05:29 2020 -0700 KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393) In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient: Improve error message in updateFeatures api when feature name is empty. Propagate top-level error message in updateFeatures api. Add an empty-parameter variety for describeFeatures api. Minor documentation updates to @param and @return to make these resemble other apis. Reviewers: Chia-Ping Tsai chia7...@gmail.com, Jun Rao jun...@gmail.com --- .../java/org/apache/kafka/clients/admin/Admin.java | 22 -- .../kafka/clients/admin/KafkaAdminClient.java | 11 ++- .../kafka/clients/admin/KafkaAdminClientTest.java | 19 ++- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 96620df..503dfd7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1306,6 +1306,17 @@ public interface Admin extends AutoCloseable { */ AlterUserScramCredentialsResult alterUserScramCredentials(List alterations, AlterUserScramCredentialsOptions options); +/** + * Describes finalized as well as supported features. + * + * This is a convenience method for {@link #describeFeatures(DescribeFeaturesOptions)} with default options. + * See the overload for more details. + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +default DescribeFeaturesResult describeFeatures() { +return describeFeatures(new DescribeFeaturesOptions()); +} /** * Describes finalized as well as supported features. By default, the request is issued to any @@ -1320,9 +1331,9 @@ public interface Admin extends AutoCloseable { * If the request timed out before the describe operation could finish. * * - * @param options the options to use * - * @return the {@link DescribeFeaturesResult} containing the result + * @param options the options to use + * @return the {@link DescribeFeaturesResult} containing the result */ DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); @@ -1367,10 +1378,9 @@ public interface Admin extends AutoCloseable { * * This operation is supported by brokers with version 2.7.0 or higher. - * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate} - * @param options the options to use - * - * @return the {@link UpdateFeaturesResult} containing the result + * @param featureUpdates the map of finalized feature name to {@link FeatureUpdate} + * @param options the options to use + * @return the {@link UpdateFeaturesResult} containing the result */ UpdateFeaturesResult updateFeatures(Map featureUpdates, UpdateFeaturesOptions options); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 43dc197..ba29b20 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4410,6 +4410,10 @@ public class KafkaAdminClient extends AdminClient { final Map> updateFutures = new HashMap<>(); for (final Map.Entry entry : featureUpdates.entrySet()) { +final String feature = entry.getKey(); +if (feature.trim().isEmpty()) { +throw new IllegalArgumentException("Provided feature can not be empty."); +} updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); } @@ -4424,10 +4428,6 @@ public class KafkaAdminClient extends AdminClient { for (Map.Entry entry : featureUpdates.entrySet()) { final String feature = entry.getKey(); final FeatureUpdate update = entry.getValue(); -if (feature.trim().isEmpty()) { -throw new IllegalAr
[kafka] branch trunk updated (de546ba -> de41834)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from de546ba MINOR: correct package of LinuxIoMetricsCollector (#9271) add de41834 KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (#9393) No new revisions were added by this update. Summary of changes: .../java/org/apache/kafka/clients/admin/Admin.java | 22 -- .../kafka/clients/admin/KafkaAdminClient.java | 11 ++- .../kafka/clients/admin/KafkaAdminClientTest.java | 19 ++- 3 files changed, 28 insertions(+), 24 deletions(-)
[kafka] branch trunk updated: MINOR: Check for active controller in UpdateFeatures request processing logic (#9436)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b752097 MINOR: Check for active controller in UpdateFeatures request processing logic (#9436) b752097 is described below commit b752097f849cef17d5ffcbaec1d56f32825167da Author: Kowshik Prakasam AuthorDate: Thu Oct 15 10:23:05 2020 -0700 MINOR: Check for active controller in UpdateFeatures request processing logic (#9436) Reviewers: Jun Rao --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ba8cd08..5b5c9c6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3133,6 +3133,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED))) +} else if (!controller.isActive) { + sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER))) } else if (!config.isFeatureVersioningSupported) { sendResponseCallback(Left(new ApiError(Errors.INVALID_REQUEST, "Feature versioning system is disabled."))) } else {
[kafka] branch trunk updated (270881c -> d99fe49)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 270881c KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343) add d99fe49 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) No new revisions were added by this update. Summary of changes: bin/{kafka-log-dirs.sh => kafka-features.sh} | 2 +- .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 653 insertions(+), 1 deletion(-) copy bin/{kafka-log-dirs.sh => kafka-features.sh} (92%) create mode 100644 core/src/main/scala/kafka/admin/FeatureCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
[kafka] branch trunk updated (270881c -> d99fe49)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 270881c KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343) add d99fe49 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) No new revisions were added by this update. Summary of changes: bin/{kafka-log-dirs.sh => kafka-features.sh} | 2 +- .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 653 insertions(+), 1 deletion(-) copy bin/{kafka-log-dirs.sh => kafka-features.sh} (92%) create mode 100644 core/src/main/scala/kafka/admin/FeatureCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
[kafka] branch trunk updated (46e48d7 -> 24290de)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 46e48d7 MINOR: Implement ApiError#equals and hashCode (#9390) add 24290de KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories (#7929) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/Log.scala| 55 ++- core/src/main/scala/kafka/log/LogManager.scala | 32 ++-- .../scala/kafka/log/ProducerStateManager.scala | 177 +++-- .../test/scala/unit/kafka/log/LogManagerTest.scala | 12 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 168 --- .../unit/kafka/log/ProducerStateManagerTest.scala | 34 6 files changed, 344 insertions(+), 134 deletions(-)
[kafka] branch trunk updated (46e48d7 -> 24290de)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 46e48d7 MINOR: Implement ApiError#equals and hashCode (#9390) add 24290de KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories (#7929) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/log/Log.scala| 55 ++- core/src/main/scala/kafka/log/LogManager.scala | 32 ++-- .../scala/kafka/log/ProducerStateManager.scala | 177 +++-- .../test/scala/unit/kafka/log/LogManagerTest.scala | 12 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 168 --- .../unit/kafka/log/ProducerStateManagerTest.scala | 34 6 files changed, 344 insertions(+), 134 deletions(-)
[kafka] branch 2.7 updated (190acf7 -> 307e062)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 190acf7 KAFKA-10605: Deprecate old PAPI registration methods (#9448) add 307e062 MINOR: Update jdk and maven names in Jenkinsfile (#9453) No new revisions were added by this update. Summary of changes: Jenkinsfile | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[kafka] branch 2.7 updated (190acf7 -> 307e062)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 190acf7 KAFKA-10605: Deprecate old PAPI registration methods (#9448) add 307e062 MINOR: Update jdk and maven names in Jenkinsfile (#9453) No new revisions were added by this update. Summary of changes: Jenkinsfile | 8 1 file changed, 4 insertions(+), 4 deletions(-)
[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new eb14539 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) eb14539 is described below commit eb1453990b147f2f8d6c9d1209990c365739f23b Author: Kowshik Prakasam AuthorDate: Tue Oct 20 09:02:08 2020 -0700 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR: --describe: Describe supported and finalized features. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config ] Optionally, use the --from-controller option to get features from the controller. --upgrade-all: Upgrades all features known to the tool to their highest max version levels. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config ] Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. --downgrade-all: Downgrades existing finalized features to the highest max version levels known to this tool. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config ]. Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. Reviewers: Boyang Chen , Jun Rao --- bin/kafka-features.sh | 17 + .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 669 insertions(+) diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh new file mode 100755 index 000..9dd9f16 --- /dev/null +++ b/bin/kafka-features.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala new file mode 100644 index 000..9cc0a10 --- /dev/null +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils +import java.util.Properties + +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ +import joptsimple.OptionSpec + +import scala.concurrent.ExecutionException + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val feature
[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new eb14539 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) eb14539 is described below commit eb1453990b147f2f8d6c9d1209990c365739f23b Author: Kowshik Prakasam AuthorDate: Tue Oct 20 09:02:08 2020 -0700 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR: --describe: Describe supported and finalized features. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config ] Optionally, use the --from-controller option to get features from the controller. --upgrade-all: Upgrades all features known to the tool to their highest max version levels. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config ] Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. --downgrade-all: Downgrades existing finalized features to the highest max version levels known to this tool. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config ]. Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. Reviewers: Boyang Chen , Jun Rao --- bin/kafka-features.sh | 17 + .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 669 insertions(+) diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh new file mode 100755 index 000..9dd9f16 --- /dev/null +++ b/bin/kafka-features.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala new file mode 100644 index 000..9cc0a10 --- /dev/null +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils +import java.util.Properties + +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ +import joptsimple.OptionSpec + +import scala.concurrent.ExecutionException + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val feature
[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new eb14539 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) eb14539 is described below commit eb1453990b147f2f8d6c9d1209990c365739f23b Author: Kowshik Prakasam AuthorDate: Tue Oct 20 09:02:08 2020 -0700 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR: --describe: Describe supported and finalized features. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config ] Optionally, use the --from-controller option to get features from the controller. --upgrade-all: Upgrades all features known to the tool to their highest max version levels. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config ] Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. --downgrade-all: Downgrades existing finalized features to the highest max version levels known to this tool. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config ]. Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. Reviewers: Boyang Chen , Jun Rao --- bin/kafka-features.sh | 17 + .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 669 insertions(+) diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh new file mode 100755 index 000..9dd9f16 --- /dev/null +++ b/bin/kafka-features.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala new file mode 100644 index 000..9cc0a10 --- /dev/null +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils +import java.util.Properties + +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ +import joptsimple.OptionSpec + +import scala.concurrent.ExecutionException + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val feature
[kafka] branch 2.7 updated: MINOR: Check for active controller in UpdateFeatures request processing logic (#9456)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new 8bebffc MINOR: Check for active controller in UpdateFeatures request processing logic (#9456) 8bebffc is described below commit 8bebffc9baae1b763c22e9ccf3f2920ef650bf51 Author: Kowshik Prakasam AuthorDate: Tue Oct 20 09:00:57 2020 -0700 MINOR: Check for active controller in UpdateFeatures request processing logic (#9456) Reviewers: Jun Rao --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ba8cd08..5b5c9c6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3133,6 +3133,8 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { sendResponseCallback(Left(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED))) +} else if (!controller.isActive) { + sendResponseCallback(Left(new ApiError(Errors.NOT_CONTROLLER))) } else if (!config.isFeatureVersioningSupported) { sendResponseCallback(Left(new ApiError(Errors.INVALID_REQUEST, "Feature versioning system is disabled."))) } else {
[kafka] branch 2.7 updated: KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.7 by this push: new eb14539 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) eb14539 is described below commit eb1453990b147f2f8d6c9d1209990c365739f23b Author: Kowshik Prakasam AuthorDate: Tue Oct 20 09:02:08 2020 -0700 KAFKA-10599: Implement basic CLI tool for feature versioning system (#9409) (#9455) This PR implements a basic CLI tool for feature versioning system. The KIP-584 write up has been updated to suit this PR. The following is implemented in this PR: --describe: Describe supported and finalized features. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --describe [--from-controller] [--command-config ] Optionally, use the --from-controller option to get features from the controller. --upgrade-all: Upgrades all features known to the tool to their highest max version levels. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --upgrade-all [--dry-run] [--command-config ] Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. --downgrade-all: Downgrades existing finalized features to the highest max version levels known to this tool. Usage: $> ./bin/kafka-features.sh --bootstrap-server host1:port1, host2:port2 --downgrade-all [--dry-run] [--command-config ]. Optionally, use the --dry-run CLI option to preview the feature updates without actually applying them. Reviewers: Boyang Chen , Jun Rao --- bin/kafka-features.sh | 17 + .../main/scala/kafka/admin/FeatureCommand.scala| 408 + .../unit/kafka/admin/FeatureCommandTest.scala | 244 3 files changed, 669 insertions(+) diff --git a/bin/kafka-features.sh b/bin/kafka-features.sh new file mode 100755 index 000..9dd9f16 --- /dev/null +++ b/bin/kafka-features.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.FeatureCommand "$@" diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala new file mode 100644 index 000..9cc0a10 --- /dev/null +++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils +import java.util.Properties + +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ +import joptsimple.OptionSpec + +import scala.concurrent.ExecutionException + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val feature
[kafka] branch 2.6 updated: KAFKA-10257 system test kafkatest.tests.core.security_rolling_upgrade_test fails (#9021)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new cbcb262 KAFKA-10257 system test kafkatest.tests.core.security_rolling_upgrade_test fails (#9021) cbcb262 is described below commit cbcb2621852edf0efeb52d005ca844b411f3c9a8 Author: Chia-Ping Tsai AuthorDate: Thu Jul 16 02:33:49 2020 +0800 KAFKA-10257 system test kafkatest.tests.core.security_rolling_upgrade_test fails (#9021) security_rolling_upgrade_test may change the security listener and then restart Kafka servers. has_sasl and has_ssl get out-of-date due to cached _security_config. This PR offers a simple fix that we always check the changes of port mapping and then update the sasl/ssl flag. Reviewers: Ismael Juma , Jun Rao --- tests/kafkatest/services/kafka/kafka.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index cbd5a06..8c4773e 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -222,9 +222,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): interbroker_sasl_mechanism=self.interbroker_sasl_mechanism, listener_security_config=self.listener_security_config, tls_version=self.tls_version) -for port in self.port_mappings.values(): -if port.open: - self._security_config.enable_security_protocol(port.security_protocol) +for port in self.port_mappings.values(): +if port.open: + self._security_config.enable_security_protocol(port.security_protocol) if self.zk.zk_sasl: self._security_config.enable_sasl() self._security_config.zk_sasl = self.zk.zk_sasl
[kafka] branch trunk updated (934033b -> 598a0d1)
This is an automated email from the ASF dual-hosted git repository. junrao pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 934033b MINOR: update required MacOS version (#9025) add 598a0d1 KAFKA-10257 system test kafkatest.tests.core.security_rolling_upgrade_test fails (#9021) No new revisions were added by this update. Summary of changes: tests/kafkatest/services/kafka/kafka.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-)