[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387293#comment-16387293
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---------------------------------------

kamalcph closed pull request #4564: KAFKA-6106; Postpone normal processing of 
tasks within a thread until…
URL: https://github.com/apache/kafka/pull/4564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde30e9..d61b281eed1 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.NoVersion)
-    retryRequestUntilConnected(setDataRequest)
+    val response = retryRequestUntilConnected(setDataRequest)
+    response.maybeThrow()
     info("Updated broker %d at path %s with addresses: 
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
     val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children)
+      
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c25c58..e44c2c94e52 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+  var otherZkClient: KafkaZkClient = _
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    otherZkClient = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (otherZkClient != null)
+      otherZkClient.close()
+    super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
 
     // test with non-existing topic
+    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
@@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
+    assertTrue(zkClient.topicExists(topic1))
+
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -214,6 +248,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     
assertEquals(Some("""{"version":1,"partitions":[{"topic":"topic-b","partition":0}]}"""),
 dataAsString(expectedPath))
   }
 
+  @Test
+  def testIsrChangeNotificationGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, 
zkClient.getAllIsrChangeNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, 
zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+
+    assertEquals(Set("0000000000", "0000000001"), 
zkClient.getAllIsrChangeNotifications.toSet)
+
+    // A partition can have multiple notifications
+    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", 
"0000000001")))
+  }
+
+  @Test
+  def testIsrChangeNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+    zkClient.propagateIsrChanges(Set(topicPartition11))
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should not fail if called on a non-existent notification
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+    assertEquals(Set("0000000000", "0000000002"), 
zkClient.getAllIsrChangeNotifications.toSet)
+    zkClient.deleteIsrChangeNotifications()
+    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+  }
+
   @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
@@ -237,6 +308,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Some("""{"version":1,"broker":4,"event":1}"""), 
dataAsString(expectedPath))
   }
 
+  @Test
+  def testLogDirGetters(): Unit = {
+    assertEquals("getAllLogDirEventNotifications failed for non existing 
parent ZK node",
+      Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent 
ZK node",
+      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    zkClient.propagateLogDirEvent(brokerId)
+
+    assertEquals(Seq(3), 
zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.propagateLogDirEvent(brokerId)
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+    assertEquals(notifications012.toSet, 
zkClient.getAllLogDirEventNotifications.toSet)
+    assertEquals(Seq(3, 3, 4), 
zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+  }
+
+  @Test
+  def testLogDirEventNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    val anotherBrokerId = 4
+
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications()
+    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+  }
+
   @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
@@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicPathMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testDeletePath(): Unit = {
+    val path = "/a/b/c"
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path)
+    assertFalse(zkClient.pathExists(path))
+  }
+
+  @Test
+  def testDeleteTopicZNode(): Unit = {
+    zkClient.deleteTopicZNode(topic1)
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.deleteTopicZNode(topic1)
+    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+  }
 
+  @Test
+  def testDeleteTopicPathMethods() {
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
+  private def assertPathExistenceAndData(expectedPath: String, data: String): 
Unit = {
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some(data), dataAsString(expectedPath))
+   }
+
+  @Test
+  def testCreateTokenChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      zkClient.createTokenChangeNotification("delegationToken")
+    }
+    zkClient.createDelegationTokenPaths()
+
+    zkClient.createTokenChangeNotification("delegationToken")
+    
assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000",
 "delegationToken")
+  }
+
   @Test
   def testEntityConfigManagementMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
-
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, "1024")
-    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
 
@@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBrokerRegistrationMethods() {
+  def testCreateConfigChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic,
 topic1))
+    }
+
+    zkClient.createTopLevelPaths()
+    
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic,
 topic1))
+
+    assertPathExistenceAndData(
+      "/config/changes/config_change_0000000000",
+      """{"version":2,"entity_path":"/config/topics/topic1"}""")
+  }
+
+  private def createLogProps(bytesProp: Int): Properties = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps
+  }
+
+  private val logProps = createLogProps(1024)
+
+  @Test
+  def testGetLogConfigs(): Unit = {
+    val emptyConfig = LogConfig(Collections.emptyMap())
+    assertEquals("Non existent config, no defaults",
+      (Map(topic1 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+    val logProps2 = createLogProps(2048)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals("One existing and one non-existent topic",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+    assertEquals("Two existing topics",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), 
Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    val logProps1WithMoreValues = createLogProps(1024)
+    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+    assertEquals("Config with defaults",
+      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1),
+        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", 
LogConfig.SegmentBytesProp -> "128").asJava))
+  }
+
+  private def createBrokerInfo(id: Int, host: String, port: Int, 
securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, 
ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), 
ApiVersion.latestVersion, jmxPort = port + 10)
+
+  @Test
+  def testRegisterBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = BrokerInfo(Broker(1,
-      Seq(new EndPoint("test.host", 9999, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), 
SecurityProtocol.PLAINTEXT)),
-      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, 
SecurityProtocol.PLAINTEXT)
+    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 
9995, SecurityProtocol.SSL)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+    assertEquals("Other ZK clients can read broker info", 
Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+    // Node exists, owned by current session - no error, no update
+    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+    // Other client tries to register broker with same id causes failure, info 
is not changed in ZK
+    intercept[NodeExistsException] {
+      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    }
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+  }
+
+  @Test
+  def testGetBrokerMethods(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+    assertEquals(None, zkClient.getBroker(0))
+
+    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, 
SecurityProtocol.PLAINTEXT)
+    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, 
SecurityProtocol.SSL)
+
+    zkClient.registerBrokerInZk(brokerInfo1)
+    otherZkClient.registerBrokerInZk(brokerInfo0)
+
+    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+    assertEquals(
+      Seq(brokerInfo0.broker, brokerInfo1.broker),
+      zkClient.getAllBrokersInCluster
+    )
+    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+  }
+
+  @Test
+  def testUpdateBrokerInfo(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    // Updating info of a broker not existing in ZK fails
+    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, 
SecurityProtocol.PLAINTEXT)
+    intercept[NoNodeException]{
+      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    }
+
+    zkClient.registerBrokerInZk(originalBrokerInfo)
+
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, 
SecurityProtocol.SSL)
+    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Other ZK clients can update info
+    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+  }
+
+  private def statWithVersion(version: Int): Stat = {
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    stat.setVersion(version)
+    stat
+  }
+
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): 
Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    Map(
+      topicPartition10 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 
+ state), zkVersion = zkVersion),
+        controllerEpoch = 4),
+      topicPartition11 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + 
state, 2 + state), zkVersion = zkVersion),
+        controllerEpoch = 4))
+
+  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, 
LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(0, 0)
+
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = 
initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, 
LeaderAndIsr] =
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+  private def checkUpdateLeaderAndIsrResult(
+                  expectedSuccessfulPartitions: Map[TopicPartition, 
LeaderAndIsr],
+                  expectedPartitionsToRetry: Seq[TopicPartition],
+                  expectedFailedPartitions: Map[TopicPartition, (Class[_], 
String)],
+                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): 
Unit = {
+    val failedPartitionsExcerpt =
+      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => 
(e.getClass, e.getMessage))
+    assertEquals("Permanently failed updates do not match expected",
+      expectedFailedPartitions, failedPartitionsExcerpt)
+    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+      expectedPartitionsToRetry, 
actualUpdateLeaderAndIsrResult.partitionsToRetry)
+    assertEquals("Successful updates do not match expected",
+      expectedSuccessfulPartitions, 
actualUpdateLeaderAndIsrResult.successfulPartitions)
+  }
+
+  @Test
+  def testUpdateLeaderAndIsr(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Non-existing topicPartitions
+    checkUpdateLeaderAndIsrResult(
+        Map.empty,
+        mutable.ArrayBuffer.empty,
+      Map(
+        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic1/partitions/0/state"),
+        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic1/partitions/1/state")),
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 1, zkVersion = 1),
+      mutable.ArrayBuffer.empty,
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 
0),controllerEpoch = 4))
+
+    // Try to update with wrong ZK version
+    checkUpdateLeaderAndIsrResult(
+      Map.empty,
+      ArrayBuffer(topicPartition10, topicPartition11),
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 
0),controllerEpoch = 4))
+
+    // Trigger successful, to be retried and failed partitions in same call
+    val mixedState = Map(
+      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = 
List(4, 5), zkVersion = 1),
+      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = 
List(3, 4), zkVersion = 0),
+      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = 
List(3, 4), zkVersion = 0))
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      ArrayBuffer(topicPartition11),
+      Map(
+        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = 
NoNode for /brokers/topics/topic2/partitions/0/state")),
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+  }
+
+  private def checkGetDataResponse(
+      leaderIsrAndControllerEpochs: 
Map[TopicPartition,LeaderIsrAndControllerEpoch],
+      topicPartition: TopicPartition,
+      response: GetDataResponse): Unit = {
+    val zkVersion = 
leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+    assertEquals(Code.OK, response.resultCode)
+    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+    assertEquals(Some(topicPartition), response.ctx)
+    assertEquals(
+      Some(leaderIsrAndControllerEpochs(topicPartition)),
+      TopicPartitionStateZNode.decode(response.data, 
statWithVersion(zkVersion)))
+  }
+
+  private def eraseMetadata(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0))
+
+  @Test
+  def testGetTopicsAndPartitions(): Unit = {
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.createRecursive(TopicZNode.path(topic2))
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(Set(topicPartition10, topicPartition11), 
zkClient.getAllPartitions)
+  }
+
+  @Test
+  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    assertEquals(
+      Seq(
+        CreateResponse(Code.OK, 
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 
0)),
+        CreateResponse(Code.OK, 
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 
0))),
+      
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+        .map(eraseMetadata).toList)
+
+    val getResponses = 
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => 
checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+    // Trying to create existing topicPartition states fails
+    assertEquals(
+      Seq(
+        CreateResponse(Code.NODEEXISTS, 
TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, 
TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          null, ResponseMetadata(0, 0))),
+      
zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+  }
+
+  @Test
+  def testSetTopicPartitionStatesRaw(): Unit = {
+
+    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: 
Code, stat: Stat) =
+      topicPartitions.map { topicPartition =>
+        SetDataResponse(resultCode, 
TopicPartitionStateZNode.path(topicPartition),
+          Some(topicPartition), stat, ResponseMetadata(0, 0))
+      }
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Trying to set non-existing topicPartition's data results in NONODE 
responses
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, 
topicPartition11)(Code.NONODE, null),
+      
zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, 
statWithVersion(1)),
+      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 
1, zkVersion = 0)).map {
+        eraseMetadataAndStat}.toList)
+
+
+    val getResponses = 
zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => 
checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), 
tp, r)}
+
+    // Other ZK client can also write the state of a partition
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, 
statWithVersion(2)),
+      
otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 
2, zkVersion = 1)).map {
+        eraseMetadataAndStat}.toList)
+  }
+
+  @Test
+  def testReassignPartitionsInProgress(): Unit = {
+    assertFalse(zkClient.reassignPartitionsInProgress)
+    zkClient.createRecursive(ReassignPartitionsZNode.path)
+    assertTrue(zkClient.reassignPartitionsInProgress)
+  }
+
+  @Test
+  def testGetTopicPartitionStates(): Unit = {
+    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(
+      initialLeaderIsrAndControllerEpochs,
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+    )
+
+    assertEquals(
+      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionState(topicPartition10)
+    )
+
+    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+    val notExistingPartition = new TopicPartition(topic1, 2)
+    
assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+    assertEquals(
+      Map(topicPartition10 -> 
initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, 
notExistingPartition))
+    )
+
+    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+  }
+
+  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse 
= {
+    val stat = if (response.stat != null) 
statWithVersion(response.stat.getVersion) else null
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+  }
+
+  @Test
+  def testControllerEpochMethods(): Unit = {
+    assertEquals(None, zkClient.getControllerEpoch)
+
+    assertEquals("Setting non existing nodes should return NONODE results",
+      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+    assertEquals("Creating non existing nodes is OK",
+      CreateResponse(Code.OK, ControllerEpochZNode.path, None, 
ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+    assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+    assertEquals("Updating existing nodes is OK",
+      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, 
statWithVersion(1), ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+    assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Updating with wrong ZK version returns BADVERSION",
+      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, 
ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+  }
+
+  @Test
+  def testControllerManagementMethods(): Unit = {
+    // No controller
+    assertEquals(None, zkClient.getControllerId)
+    // Create controller
+    zkClient.checkedEphemeralCreate(ControllerZNode.path, 
ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+    assertEquals(Some(1), zkClient.getControllerId)
+    zkClient.deleteController()
+    assertEquals(None, zkClient.getControllerId)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    val mockPath = "/foo"
+
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+
+      override val path: String = mockPath
+    }
+
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+    zkClient.createRecursive(mockPath)
+    assertTrue("Failed to receive create notification", 
znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
-    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new 
TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //test deleting token
+    assertTrue(zkClient.deleteDelegationToken(tokenId))
+    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9eca88..7e92c1f44c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,19 +73,6 @@ void addNewTask(final T task) {
         created.put(task.id(), task);
     }
 
-    Set<TopicPartition> uninitializedPartitions() {
-        if (created.isEmpty()) {
-            return Collections.emptySet();
-        }
-        final Set<TopicPartition> partitions = new HashSet<>();
-        for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
-            if (entry.getValue().hasStateStores()) {
-                partitions.addAll(entry.getValue().partitions());
-            }
-        }
-        return partitions;
-    }
-
     /**
      * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9f02834dd75..290ad81735b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -109,9 +109,9 @@ void createTasks(final Collection<TopicPartition> 
assignment) {
         active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
         addStreamTasks(assignment);
         addStandbyTasks();
-        final Set<TopicPartition> partitions = 
active.uninitializedPartitions();
-        log.trace("Pausing partitions: {}", partitions);
-        consumer.pause(partitions);
+        // Pause all the partitions until the underlying state store is ready 
for all the active tasks.
+        log.trace("Pausing partitions: {}", assignment);
+        consumer.pause(assignment);
     }
 
     /**
@@ -322,18 +322,17 @@ void setConsumer(final Consumer<byte[], byte[]> consumer) 
{
      * @throws TaskMigratedException if another thread wrote to the changelog 
topic that is currently restored
      */
     boolean updateNewAndRestoringTasks() {
-        final Set<TopicPartition> resumed = active.initializeNewTasks();
+        active.initializeNewTasks();
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = 
changelogReader.restore(active);
 
-        resumed.addAll(active.updateRestored(restored));
+        active.updateRestored(restored);
 
-        if (!resumed.isEmpty()) {
-            log.trace("Resuming partitions {}", resumed);
-            consumer.resume(resumed);
-        }
         if (active.allTasksRunning()) {
+            Set<TopicPartition> assignment = consumer.assignment();
+            log.trace("Resuming partitions {}", assignment);
+            consumer.resume(assignment);
             assignStandbyPartitions();
             return true;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 4bb7828fe25..a838fd1ee52 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -57,36 +57,6 @@ public void before() {
         EasyMock.expect(t2.id()).andReturn(taskId2).anyTimes();
     }
 
-    @Test
-    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(true);
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = 
assignedTasks.uninitializedPartitions();
-        assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
-    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = 
assignedTasks.uninitializedPartitions();
-        assertTrue(partitions.isEmpty());
-        EasyMock.verify(t1, t2);
-    }
-
     @Test
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 482b764f3ff..b22d98ee41c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -158,6 +158,10 @@ public void testPartitionAssignmentChangeForSingleGroup() {
         // assign single partition
         assignedPartitions = Collections.singletonList(t1p1);
         thread.taskManager().setAssignmentMetadata(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), Collections.<TaskId, 
Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
@@ -378,8 +382,13 @@ public void 
shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
         assertEquals(1, clientSupplier.producers.size());
@@ -411,6 +420,12 @@ public void 
shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable()
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new 
HashSet<>(assignedPartitions));
 
         thread.runOnce(-1);
@@ -439,7 +454,12 @@ public void 
shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
 
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -595,6 +615,9 @@ public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWh
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -659,6 +682,10 @@ public void 
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAt
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -714,8 +741,10 @@ public void 
shouldReturnActiveTaskMetadataWhileRunningState() {
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -883,9 +912,9 @@ public void close() { }
 
         thread.taskManager().setAssignmentMetadata(activeTasks, 
Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
         clientSupplier.consumer.assign(assignedPartitions);
         
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
 
@@ -1074,17 +1103,18 @@ public void 
shouldReportSkippedRecordsForInvalidTimestamps() {
         thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
-        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+        final Set<TopicPartition> assignedPartitions = 
Collections.singleton(t1p1);
         thread.taskManager().setAssignmentMetadata(
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
             Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         final MetricName skippedTotalMetric = 
metrics.metricName("skipped-records-total", "stream-metrics", 
Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 648e9b080cd..f78374321d4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
+import org.easymock.IAnswer;
 import org.easymock.Mock;
 import org.easymock.MockType;
 import org.junit.Before;
@@ -117,7 +118,7 @@
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         taskManager = new TaskManager(changeLogReader,
                                       UUID.randomUUID(),
                                       "",
@@ -324,11 +325,9 @@ public void shouldNotAddResumedStandbyTasks() {
         verify(standby, standbyTaskCreator);
     }
 
-
     @Test
-    public void shouldPauseActiveUninitializedPartitions() {
+    public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
-        
EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions);
         consumer.pause(taskId0Partitions);
         EasyMock.expectLastCall();
         replay();
@@ -455,6 +454,9 @@ public void shouldResumeRestoredPartitions() {
         
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         EasyMock.expect(active.updateRestored(taskId0Partitions)).
                 andReturn(taskId0Partitions);
+        EasyMock.expect(active.allTasksRunning()).andReturn(true);
+        EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
+        
EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
 
         consumer.resume(taskId0Partitions);
         EasyMock.expectLastCall();
@@ -626,13 +628,19 @@ public void shouldPunctuateActiveTasks() {
     }
 
     @Test
-    public void shouldResumeConsumptionOfInitializedPartitions() {
+    public void shouldNotResumeConsumptionUntilAllStoresRestored() {
         final Set<TopicPartition> resumed = Collections.singleton(new 
TopicPartition("topic", 0));
         EasyMock.expect(active.initializeNewTasks()).andReturn(resumed);
         
EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
                 andReturn(Collections.<TopicPartition>emptySet());
         consumer.resume(resumed);
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() {
+                fail("Shouldn't resume partitions until all the stores are 
restored");
+                return null;
+            }
+        }).anyTimes();
 
         EasyMock.replay(active, consumer);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6106
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6106
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.11.0.1, 1.0.0
>            Reporter: Guozhang Wang
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>              Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to