Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b1313935f -> bc6a3bc6f


MINOR: A few cleanups in KafkaApis and TransactionMarkerChannelManager

Author: Jason Gustafson <[email protected]>

Reviewers: Colin P. Mccabe <[email protected]>, Ismael Juma 
<[email protected]>

Closes #3171 from hachikuji/minor-txn-channel-cleanups


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63605779
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63605779
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63605779

Branch: refs/heads/0.11.0
Commit: 63605779ef67fb39487cf0487c8ac4caa8d39cbc
Parents: b131393
Author: Jason Gustafson <[email protected]>
Authored: Sat Jun 17 02:03:25 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Sat Jun 17 14:04:32 2017 +0100

----------------------------------------------------------------------
 .../common/requests/DeleteAclsResponse.java     |   8 +
 .../kafka/common/InterBrokerSendThread.scala    |  29 +--
 .../TransactionMarkerChannelManager.scala       |  22 +--
 .../scala/kafka/security/SecurityUtils.scala    |  48 +++++
 .../kafka/security/auth/PermissionType.scala    |   5 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 187 ++++++-------------
 .../common/InterBrokerSendThreadTest.scala      |  16 +-
 .../TransactionMarkerChannelManagerTest.scala   |  20 +-
 8 files changed, 151 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 796e200..94cd6aa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -49,6 +49,10 @@ public class DeleteAclsResponse extends AbstractResponse {
             this.acl = acl;
         }
 
+        public AclDeletionResult(AclBinding acl) {
+            this(null, acl);
+        }
+
         public ApiException exception() {
             return exception;
         }
@@ -72,6 +76,10 @@ public class DeleteAclsResponse extends AbstractResponse {
             this.deletions = deletions;
         }
 
+        public AclFilterResponse(Collection<AclDeletionResult> deletions) {
+            this(null, deletions);
+        }
+
         public Throwable throwable() {
             return throwable;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala 
b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index 886e41c..06158b2 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -27,24 +27,27 @@ import org.apache.kafka.common.utils.Time
 /**
  *  Class for inter-broker send thread that utilize a non-blocking network 
client.
  */
-class InterBrokerSendThread(name: String,
-                            networkClient: NetworkClient,
-                            requestGenerator: () => 
Iterable[RequestAndCompletionHandler],
-                            time: Time,
-                            isInterruptible: Boolean = true)
+abstract class InterBrokerSendThread(name: String,
+                                     networkClient: NetworkClient,
+                                     time: Time,
+                                     isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
 
-  // visible for testing
-  def generateRequests(): Iterable[RequestAndCompletionHandler] = 
requestGenerator()
+  def generateRequests(): Iterable[RequestAndCompletionHandler]
+
+  override def shutdown(): Unit = {
+    initiateShutdown()
+    // wake up the thread in case it is blocked inside poll
+    networkClient.wakeup()
+    awaitShutdown()
+  }
 
   override def doWork() {
     val now = time.milliseconds()
     var pollTimeout = Long.MaxValue
 
     try {
-      val requestsToSend: Iterable[RequestAndCompletionHandler] = 
requestGenerator()
-
-      for (request: RequestAndCompletionHandler <- requestsToSend) {
+      for (request: RequestAndCompletionHandler <- generateRequests()) {
         val destination = Integer.toString(request.destination.id())
         val completionHandler = request.handler
         val clientRequest = networkClient.newClientRequest(destination,
@@ -79,6 +82,10 @@ class InterBrokerSendThread(name: String,
         throw new FatalExitError()
     }
   }
+
+  def wakeup(): Unit = networkClient.wakeup()
+
 }
 
-case class RequestAndCompletionHandler(destination: Node, request: 
AbstractRequest.Builder[_ <: AbstractRequest], handler: 
RequestCompletionHandler)
\ No newline at end of file
+case class RequestAndCompletionHandler(destination: Node, request: 
AbstractRequest.Builder[_ <: AbstractRequest],
+                                       handler: RequestCompletionHandler)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 22f01c1..9c3ffd9 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -122,16 +122,12 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
                                       networkClient: NetworkClient,
                                       txnStateManager: TransactionStateManager,
                                       txnMarkerPurgatory: 
DelayedOperationPurgatory[DelayedTxnMarker],
-                                      time: Time) extends Logging with 
KafkaMetricsGroup {
+                                      time: Time) extends 
InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, 
networkClient, time) with Logging with KafkaMetricsGroup {
 
   this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + 
"]: "
 
   private val interBrokerListenerName: ListenerName = 
config.interBrokerListenerName
 
-  private val txnMarkerSendThread: InterBrokerSendThread = {
-    new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, 
networkClient, drainQueuedTransactionMarkers, time)
-  }
-
   private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new 
ConcurrentHashMap[Int, TxnMarkerQueue]().asScala
 
   private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode)
@@ -152,15 +148,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
     }
   )
 
-  def start(): Unit = {
-    txnMarkerSendThread.start()
-  }
+  override def generateRequests() = drainQueuedTransactionMarkers()
 
-  def shutdown(): Unit = {
-    txnMarkerSendThread.initiateShutdown()
-    // wake up the thread in case it is blocked inside poll
-    networkClient.wakeup()
-    txnMarkerSendThread.awaitShutdown()
+  override def shutdown(): Unit = {
+    super.shutdown()
     txnMarkerPurgatory.shutdown()
     markersQueuePerBroker.clear()
   }
@@ -173,9 +164,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   // visible for testing
   private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker
 
-  // visible for testing
-  private[transaction] def senderThread = txnMarkerSendThread
-
   private[transaction] def addMarkersForBroker(broker: Node, 
txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) {
     val brokerId = broker.id
 
@@ -369,7 +357,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       }
     }
 
-    networkClient.wakeup()
+    wakeup()
   }
 
   def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala 
b/core/src/main/scala/kafka/security/SecurityUtils.scala
new file mode 100644
index 0000000..bbfc42c
--- /dev/null
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -0,0 +1,48 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.security
+
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
ResourceType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter}
+import org.apache.kafka.common.resource.{Resource => AdminResource, 
ResourceType => AdminResourceType}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+import scala.util.Try
+
+
+object SecurityUtils {
+
+  def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] 
= {
+    for {
+      resourceType <- 
Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
+      principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
+      operation <- Try(Operation.fromJava(filter.entryFilter.operation))
+      permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
+      resource = Resource(resourceType, filter.resourceFilter.name)
+      acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
+    } yield (resource, acl)
+  }
+
+  def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
+    val adminResource = new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name)
+    val entry = new AccessControlEntry(acl.principal.toString, 
acl.host.toString,
+      acl.operation.toJava, acl.permissionType.toJava)
+    new AclBinding(adminResource, entry)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala 
b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index 686c60b..c603351 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -21,11 +21,6 @@ import org.apache.kafka.common.acl.AclPermissionType
 
 import scala.util.{Failure, Success, Try}
 
-/**
- * PermissionType.
- */
-
-
 sealed trait PermissionType extends BaseEnum {
   val toJava: AclPermissionType
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 27eb816..337c740 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, 
JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
+import kafka.security.SecurityUtils
 import kafka.security.auth._
 import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
@@ -50,13 +51,12 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.resource.{Resource => AdminResource, 
ResourceType => AdminResourceType}
-import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter, AclOperation, AclPermissionType}
+import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -1782,70 +1782,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  /**
-    * Convert an ACL binding filter to a Scala object.
-    * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null 
fields are allowed.)
-    *
-    * @param filter     The binding filter as a Java object.
-    * @return           The binding filter as a scala object, or an exception 
if there was an error
-    *                   converting the Java object.
-    */
-  def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = {
-    filter.resourceFilter().resourceType() match {
-      case AdminResourceType.UNKNOWN => return Failure(new 
InvalidRequestException("Invalid UNKNOWN resource type"))
-      case AdminResourceType.ANY => return Failure(new 
InvalidRequestException("Invalid ANY resource type"))
-      case _ => {}
-    }
-    val resourceType: ResourceType = try {
-      ResourceType.fromJava(filter.resourceFilter.resourceType)
-    } catch {
-      case throwable: Throwable => return Failure(new 
InvalidRequestException("Invalid resource type"))
-    }
-    val principal: KafkaPrincipal = try {
-      KafkaPrincipal.fromString(filter.entryFilter.principal)
-    } catch {
-      case throwable: Throwable => return Failure(new 
InvalidRequestException("Invalid principal"))
-    }
-    filter.entryFilter().operation() match {
-      case AclOperation.UNKNOWN => return Failure(new 
InvalidRequestException("Invalid UNKNOWN operation type"))
-      case AclOperation.ANY => return Failure(new 
InvalidRequestException("Invalid ANY operation type"))
-      case _ => {}
-    }
-    val operation: Operation = try {
-      Operation.fromJava(filter.entryFilter.operation)
-    } catch {
-      case throwable: Throwable => return Failure(new 
InvalidRequestException(throwable.getMessage))
-    }
-    filter.entryFilter().permissionType() match {
-      case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid 
UNKNOWN permission type")
-      case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY 
permission type")
-      case _ => {}
-    }
-    val permissionType: PermissionType = try {
-      PermissionType.fromJava(filter.entryFilter.permissionType)
-    } catch {
-      case throwable: Throwable => return Failure(new 
InvalidRequestException(throwable.getMessage))
-    }
-    return Success((Resource(resourceType, filter.resourceFilter().name()), 
Acl(principal, permissionType,
-                   filter.entryFilter().host(), operation)))
-  }
-
-  /**
-    * Convert a Scala ACL binding to a Java object.
-    *
-    * @param acl        The binding as a Scala object.
-    * @return           The binding as a Java object.
-    */
-  def toJava(acl: (Resource, Acl)) : AclBinding = {
-    acl match {
-      case (resource, acl) =>
-        val adminResource = new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name)
-        val entry = new AccessControlEntry(acl.principal.toString, 
acl.host.toString,
-                                  acl.operation.toJava, 
acl.permissionType.toJava)
-        return new AclBinding(adminResource, entry)
-    }
-  }
-
   def handleCreateAcls(request: RequestChannel.Request): Unit = {
     authorizeClusterAlter(request)
     val createAclsRequest = request.body[CreateAclsRequest]
@@ -1855,11 +1791,9 @@ class KafkaApis(val requestChannel: RequestChannel,
           createAclsRequest.getErrorResponse(requestThrottleMs,
             new SecurityDisabledException("No Authorizer is configured on the 
broker.")))
       case Some(auth) =>
-        val errors = mutable.HashMap[Int, Throwable]()
-        for (i <- 0 until createAclsRequest.aclCreations.size) {
-          val result = 
toScala(createAclsRequest.aclCreations.get(i).acl.toFilter)
-          result match {
-            case Failure(throwable) => errors.put(i, throwable)
+        val aclCreationResults = createAclsRequest.aclCreations.asScala.map { 
aclCreation =>
+          SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) 
match {
+            case Failure(throwable) => new AclCreationResponse(throwable)
             case Success((resource, acl)) => try {
                 if (resource.resourceType.equals(Cluster) &&
                     !resource.name.equals(Resource.ClusterResourceName))
@@ -1868,25 +1802,19 @@ class KafkaApis(val requestChannel: RequestChannel,
                 if (resource.name.isEmpty)
                   throw new InvalidRequestException("Invalid empty resource 
name")
                 auth.addAcls(immutable.Set(acl), resource)
-                if (logger.isDebugEnabled)
-                  logger.debug(s"Added acl $acl to $resource")
+
+                logger.debug(s"Added acl $acl to $resource")
+
+                new AclCreationResponse(null)
               } catch {
-                case throwable : Throwable => if (logger.isDebugEnabled) {
-                    logger.debug(s"Failed to add acl $acl to $resource", 
throwable)
-                  }
-                  errors.put(i, throwable)
+                case throwable: Throwable =>
+                  logger.debug(s"Failed to add acl $acl to $resource", 
throwable)
+                  new AclCreationResponse(throwable)
               }
           }
         }
-        val aclCreationResults = new java.util.ArrayList[AclCreationResponse]
-        for (i <- 0 to createAclsRequest.aclCreations().size() - 1) {
-          errors.get(i) match {
-            case Some(throwable) => aclCreationResults.add(new 
AclCreationResponse(throwable))
-            case None => aclCreationResults.add(new AclCreationResponse(null))
-          }
-        }
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new CreateAclsResponse(requestThrottleMs, aclCreationResults))
+          new CreateAclsResponse(requestThrottleMs, aclCreationResults.asJava))
     }
   }
 
@@ -1899,60 +1827,53 @@ class KafkaApis(val requestChannel: RequestChannel,
           deleteAclsRequest.getErrorResponse(requestThrottleMs,
             new SecurityDisabledException("No Authorizer is configured on the 
broker.")))
       case Some(auth) =>
-        val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]()
-        val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]()
-        for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-          toDelete.put(i, new ListBuffer[(Resource, Acl)]())
-        }
-        if (deleteAclsRequest.filters().asScala.exists { f => 
!f.matchesAtMostOne() }) {
-          // Delete based on filters that may match more than one ACL.
-          val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
-          aclMap.foreach { case (resource, acls) =>
-            acls.foreach { acl =>
-              val binding = new AclBinding(new AdminResource(AdminResourceType.
-                fromString(resource.resourceType.toString), resource.name),
-                new AccessControlEntry(acl.principal.toString(), 
acl.host.toString(),
-                  acl.operation.toJava, acl.permissionType.toJava))
-              for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-                val filter = deleteAclsRequest.filters().get(i)
-                if (filter.matches(binding)) {
-                  toDelete.get(i).get += ((resource, acl))
-                }
-              }
+        val filters = deleteAclsRequest.filters.asScala
+        val filterResponseMap = mutable.Map[Int, AclFilterResponse]()
+        val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
+
+        if (filters.forall(_.matchesAtMostOne)) {
+          // Delete based on a list of ACL fixtures.
+          for ((filter, i) <- filters.zipWithIndex) {
+            SecurityUtils.convertToResourceAndAcl(filter) match {
+              case Failure(throwable) => filterResponseMap.put(i, new 
AclFilterResponse(throwable, Seq.empty.asJava))
+              case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
             }
           }
         } else {
-          // Delete based on a list of ACL fixtures.
-          for (i <- 0 to deleteAclsRequest.filters().size - 1) {
-            toScala(deleteAclsRequest.filters().get(i)) match {
-              case Failure(throwable) => filterResponseMap.put(i,
-                new AclFilterResponse(throwable, 
Collections.emptySet[AclDeletionResult]()))
-              case Success(fixture) => toDelete.put(i, ListBuffer(fixture))
-            }
+          // Delete based on filters that may match more than one ACL.
+          val aclMap = auth.getAcls()
+          val filtersWithIndex = filters.zipWithIndex
+          for ((resource, acls) <- aclMap; acl <- acls) {
+            val binding = new AclBinding(
+              new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name),
+              new AccessControlEntry(acl.principal.toString, 
acl.host.toString, acl.operation.toJava,
+                acl.permissionType.toJava))
+
+            for ((filter, i) <- filtersWithIndex if filter.matches(binding))
+              toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, 
acl))
           }
         }
-        for (i <- toDelete.keys) {
-          val deletionResults = new util.ArrayList[AclDeletionResult]()
-          for (acls <- toDelete.get(i)) {
-            for ((resource, acl) <- acls) {
-              try {
-                if (auth.removeAcls(immutable.Set(acl), resource)) {
-                  deletionResults.add(new AclDeletionResult(null, 
toJava((resource, acl))))
-                }
-              } catch {
-                case throwable: Throwable => deletionResults.add(new 
AclDeletionResult(
-                  new UnknownServerException("Failed to delete ACL: " + 
throwable.toString),
-                    toJava((resource, acl))))
-              }
+
+        for ((i, acls) <- toDelete) {
+          val deletionResults = acls.flatMap { case (resource, acl) =>
+            val aclBinding = SecurityUtils.convertToAclBinding(resource, acl)
+            try {
+              if (auth.removeAcls(immutable.Set(acl), resource))
+                Some(new AclDeletionResult(aclBinding))
+              else None
+            } catch {
+              case throwable: Throwable =>
+                Some(new AclDeletionResult(new UnknownServerException(s"Failed 
to delete ACL $acl: $throwable"),
+                  aclBinding))
             }
-          }
-          filterResponseMap.put(i, new AclFilterResponse(null, 
deletionResults))
-        }
-        val filterResponses = new util.ArrayList[AclFilterResponse]
-        for (i <- 0 to deleteAclsRequest.filters().size() - 1) {
-          filterResponses.add(filterResponseMap.getOrElse(i,
-            new AclFilterResponse(null, new 
util.ArrayList[AclDeletionResult]())))
+          }.asJava
+          
+          filterResponseMap.put(i, new AclFilterResponse(deletionResults))
         }
+
+        val filterResponses = filters.indices.map { i =>
+          filterResponseMap.getOrElse(i, new 
AclFilterResponse(Seq.empty.asJava))
+        }.asJava
         sendResponseMaybeThrottle(request, requestThrottleMs => new 
DeleteAclsResponse(requestThrottleMs, filterResponses))
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala 
b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index a508c41..c6ebdd1 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -34,7 +34,9 @@ class InterBrokerSendThreadTest {
 
   @Test
   def shouldNotSendAnythingWhenNoRequests(): Unit = {
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => 
mutable.Iterable.empty, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = mutable.Iterable.empty
+    }
 
     // poll is always called but there should be no further invocations on 
NetworkClient
     EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong()))
@@ -52,9 +54,9 @@ class InterBrokerSendThreadTest {
     val request = new StubRequestBuilder()
     val node = new Node(1, "", 8080)
     val handler = RequestAndCompletionHandler(node, request, completionHandler)
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => {
-      List[RequestAndCompletionHandler](handler)
-    }, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = 
List[RequestAndCompletionHandler](handler)
+    }
 
     val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, 
handler.handler)
 
@@ -86,9 +88,9 @@ class InterBrokerSendThreadTest {
     val request = new StubRequestBuilder
     val node = new Node(1, "", 8080)
     val requestAndCompletionHandler = RequestAndCompletionHandler(node, 
request, completionHandler)
-    val sendThread = new InterBrokerSendThread("name", networkClient, () => {
-      List[RequestAndCompletionHandler](requestAndCompletionHandler)
-    }, time)
+    val sendThread = new InterBrokerSendThread("name", networkClient, time) {
+      override def generateRequests() = 
List[RequestAndCompletionHandler](requestAndCompletionHandler)
+    }
 
     val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, 
requestAndCompletionHandler.handler)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 01a350b..9e7fb13 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -73,8 +73,6 @@ class TransactionMarkerChannelManagerTest {
     txnMarkerPurgatory,
     time)
 
-  private val senderThread = channelManager.senderThread
-
   private def mockCache(): Unit = {
     EasyMock.expect(txnStateManager.partitionFor(transactionalId1))
       .andReturn(txnTopicPartition1)
@@ -93,7 +91,7 @@ class TransactionMarkerChannelManagerTest {
 
   @Test
   def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = {
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertTrue(channelManager.generateRequests().isEmpty)
   }
 
   @Test
@@ -131,12 +129,12 @@ class TransactionMarkerChannelManagerTest {
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
       Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
-    val requests: Map[Node, WriteTxnMarkersRequest] = 
senderThread.generateRequests().map { handler =>
+    val requests: Map[Node, WriteTxnMarkersRequest] = 
channelManager.generateRequests().map { handler =>
       (handler.destination, 
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
     assertEquals(Map(broker1 -> expectedBroker1Request, broker2 -> 
expectedBroker2Request), requests)
-    assertTrue(senderThread.generateRequests().isEmpty)
+    assertTrue(channelManager.generateRequests().isEmpty)
   }
 
   @Test
@@ -208,13 +206,13 @@ class TransactionMarkerChannelManagerTest {
     val expectedBroker2Request = new WriteTxnMarkersRequest.Builder(
       Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, 
producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build()
 
-    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = 
senderThread.generateRequests().map { handler =>
+    val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = 
channelManager.generateRequests().map { handler =>
       (handler.destination, 
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
     assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests)
 
-    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = 
senderThread.generateRequests().map { handler =>
+    val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = 
channelManager.generateRequests().map { handler =>
       (handler.destination, 
handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build())
     }.toMap
 
@@ -294,7 +292,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -342,7 +340,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -396,7 +394,7 @@ class TransactionMarkerChannelManagerTest {
 
     channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, 
txnResult, txnMetadata2, txnTransitionMetadata2)
 
-    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
senderThread.generateRequests()
+    val requestAndHandlers: Iterable[RequestAndCompletionHandler] = 
channelManager.generateRequests()
 
     val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE))
     for (requestAndHandler <- requestAndHandlers) {
@@ -404,7 +402,7 @@ class TransactionMarkerChannelManagerTest {
     }
 
     // call this again so that append log will be retried
-    senderThread.generateRequests()
+    channelManager.generateRequests()
 
     EasyMock.verify(txnStateManager)
 

Reply via email to