Repository: kafka
Updated Branches:
  refs/heads/trunk dd8cdb79d -> 2cc8f48ae


KAFKA-5308; TC should handle UNSUPPORTED_FOR_MESSAGE_FORMAT in WriteTxnMarker 
response

Return UNSUPPORTED_MESSAGE_FORMAT in handleWriteTxnMarkers when a topic is not 
the correct message format.
Remove any TopicPartitions that have same error from those waiting for markers

Author: Damian Guy <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang 
<[email protected]>

Closes #3152 from dguy/kafka-5308


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2cc8f48a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2cc8f48a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2cc8f48a

Branch: refs/heads/trunk
Commit: 2cc8f48ae5358087381fa5f79233789499d6b939
Parents: dd8cdb7
Author: Damian Guy <[email protected]>
Authored: Tue May 30 23:38:50 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Tue May 30 23:38:50 2017 -0700

----------------------------------------------------------------------
 .../common/requests/ByteBufferChannel.java      |  71 ++++++++++
 .../common/requests/RequestResponseTest.java    |  52 +------
 ...nsactionMarkerRequestCompletionHandler.scala |   9 ++
 .../src/main/scala/kafka/server/KafkaApis.scala |  59 +++++---
 ...tionMarkerRequestCompletionHandlerTest.scala |   6 +
 .../scala/unit/kafka/server/KafkaApisTest.scala | 142 ++++++++++++++++++-
 6 files changed, 266 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java 
b/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java
new file mode 100644
index 0000000..0446df9
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ByteBufferChannel.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+public class ByteBufferChannel implements GatheringByteChannel {
+    private final ByteBuffer buf;
+    private boolean closed = false;
+
+    public ByteBufferChannel(long size) {
+        if (size > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("size should be not be greater 
than Integer.MAX_VALUE");
+        this.buf = ByteBuffer.allocate((int) size);
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException {
+        int position = buf.position();
+        for (int i = 0; i < length; i++) {
+            ByteBuffer src = srcs[i].duplicate();
+            if (i == 0)
+                src.position(offset);
+            buf.put(src);
+        }
+        return buf.position() - position;
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs) throws IOException {
+        return write(srcs, 0, srcs.length);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        int position = buf.position();
+        buf.put(src);
+        return buf.position() - position;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return !closed;
+    }
+
+    @Override
+    public void close() throws IOException {
+        buf.flip();
+        closed = true;
+    }
+
+    public ByteBuffer buffer() {
+        return buf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2e9a688..e0f48bf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -53,11 +53,9 @@ import 
org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -479,14 +477,14 @@ public class RequestResponseTest {
         send.writeTo(channel);
         channel.close();
 
-        ByteBuffer buf = channel.buf;
+        ByteBuffer buf = channel.buffer();
 
         // read the size
         int size = buf.getInt();
         assertTrue(size > 0);
 
         // read the header
-        ResponseHeader responseHeader = ResponseHeader.parse(channel.buf);
+        ResponseHeader responseHeader = ResponseHeader.parse(channel.buffer());
         assertEquals(header.correlationId(), responseHeader.correlationId());
 
         // read the body
@@ -1048,52 +1046,6 @@ public class RequestResponseTest {
         return new DeleteAclsResponse(0, responses);
     }
 
-    private static class ByteBufferChannel implements GatheringByteChannel {
-        private final ByteBuffer buf;
-        private boolean closed = false;
-
-        private ByteBufferChannel(long size) {
-            if (size > Integer.MAX_VALUE)
-                throw new IllegalArgumentException("size should be not be 
greater than Integer.MAX_VALUE");
-            this.buf = ByteBuffer.allocate((int) size);
-        }
-
-        @Override
-        public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException {
-            int position = buf.position();
-            for (int i = 0; i < length; i++) {
-                ByteBuffer src = srcs[i].duplicate();
-                if (i == 0)
-                    src.position(offset);
-                buf.put(src);
-            }
-            return buf.position() - position;
-        }
-
-        @Override
-        public long write(ByteBuffer[] srcs) throws IOException {
-            return write(srcs, 0, srcs.length);
-        }
-
-        @Override
-        public int write(ByteBuffer src) throws IOException {
-            int position = buf.position();
-            buf.put(src);
-            return buf.position() - position;
-        }
-
-        @Override
-        public boolean isOpen() {
-            return !closed;
-        }
-
-        @Override
-        public void close() throws IOException {
-            buf.flip();
-            closed = true;
-        }
-    }
-
     private DescribeConfigsRequest createDescribeConfigsRequest() {
         return new DescribeConfigsRequest.Builder(asList(
                 new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 7c1c356..5fa6035 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -155,6 +155,15 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
                       
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
                       abortSending = true
 
+                    case Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT |
+                         Errors.UNSUPPORTED_VERSION =>
+                      // The producer would have failed to send data to the 
failed topic so we can safely remove the partition
+                      // from the set waiting for markers
+                      info(s"Sending $transactionalId's transaction marker 
from partition $topicPartition has failed with " +
+                        s" ${error.name}. This partition will be removed from 
the set of partitions" +
+                        s" waiting for completion")
+                      txnMetadata.removePartition(topicPartition)
+
                     case other =>
                       throw new IllegalStateException(s"Unexpected error 
${other.exceptionName} while sending txn marker for $transactionalId")
                   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 459cb27..d7f6773 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1492,8 +1492,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     def sendResponseCallback(producerId: Long, result: 
TransactionResult)(responseStatus: Map[TopicPartition, PartitionResponse]): 
Unit = {
       trace(s"End transaction marker append for producer id $producerId 
completed with status: $responseStatus")
-      errors.put(producerId, responseStatus.mapValues(_.error).asJava)
-
+      val partitionErrors = responseStatus.mapValues(_.error).asJava
+      val previous = errors.putIfAbsent(producerId, partitionErrors)
+      if (previous != null)
+        previous.putAll(partitionErrors)
+      
       val successfulOffsetsPartitions = responseStatus.filter { case 
(topicPartition, partitionResponse) =>
         topicPartition.topic == GROUP_METADATA_TOPIC_NAME && 
partitionResponse.error == Errors.NONE
       }.keys
@@ -1519,25 +1522,49 @@ class KafkaApis(val requestChannel: RequestChannel,
     // be nice to have only one append to the log. This requires pushing the 
building of the control records
     // into Log so that we only append those having a valid producer epoch, 
and exposing a new appendControlRecord
     // API in ReplicaManager. For now, we've done the simpler approach
+    var skippedMarkers = 0
     for (marker <- markers.asScala) {
       val producerId = marker.producerId
-      val controlRecords = marker.partitions.asScala.map { partition =>
-        val controlRecordType = marker.transactionResult match {
-          case TransactionResult.COMMIT => ControlRecordType.COMMIT
-          case TransactionResult.ABORT => ControlRecordType.ABORT
+      val (goodPartitions, partitionsWithIncorrectMessageFormat) = 
marker.partitions.asScala.partition { partition =>
+        replicaManager.getMagic(partition) match {
+          case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
+          case _ => false
         }
-        val endTxnMarker = new EndTransactionMarker(controlRecordType, 
marker.coordinatorEpoch)
-        partition -> MemoryRecords.withEndTransactionMarker(producerId, 
marker.producerEpoch, endTxnMarker)
-      }.toMap
+      }
 
-      replicaManager.appendRecords(
-        timeout = config.requestTimeoutMs.toLong,
-        requiredAcks = -1,
-        internalTopicsAllowed = true,
-        isFromClient = false,
-        entriesPerPartition = controlRecords,
-        responseCallback = sendResponseCallback(producerId, 
marker.transactionResult))
+      if (partitionsWithIncorrectMessageFormat.nonEmpty) {
+        val partitionErrors = new util.HashMap[TopicPartition, Errors]()
+        partitionsWithIncorrectMessageFormat.foreach { partition => 
partitionErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
+        errors.put(producerId, partitionErrors)
+      }
+
+      if (goodPartitions.isEmpty) {
+        numAppends.decrementAndGet()
+        skippedMarkers += 1
+      } else {
+        val controlRecords = goodPartitions.map { partition =>
+          val controlRecordType = marker.transactionResult match {
+            case TransactionResult.COMMIT => ControlRecordType.COMMIT
+            case TransactionResult.ABORT => ControlRecordType.ABORT
+          }
+          val endTxnMarker = new EndTransactionMarker(controlRecordType, 
marker.coordinatorEpoch)
+          partition -> MemoryRecords.withEndTransactionMarker(producerId, 
marker.producerEpoch, endTxnMarker)
+        }.toMap
+
+        replicaManager.appendRecords(
+          timeout = config.requestTimeoutMs.toLong,
+          requiredAcks = -1,
+          internalTopicsAllowed = true,
+          isFromClient = false,
+          entriesPerPartition = controlRecords,
+          responseCallback = sendResponseCallback(producerId, 
marker.transactionResult))
+      }
     }
+
+    // No log appends were written as all partitions had incorrect log format
+    // so we need to send the error response
+    if (skippedMarkers == markers.size())
+      sendResponseExemptThrottle(request, () => sendResponse(request, new 
WriteTxnMarkersResponse(errors)))
   }
 
   def ensureInterBrokerVersion(version: ApiVersion): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index a255830..e3e67b7 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -187,6 +187,12 @@ class TransactionMarkerRequestCompletionHandlerTest {
     verifyRetriesPartitionOnError(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
   }
 
+  @Test
+  def shouldRemoveTopicPartitionFromWaitingSetOnUnsupportedForMessageFormat(): 
Unit = {
+    mockCache()
+    
verifyCompleteDelayedOperationOnError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)
+  }
+
   private def verifyRetriesPartitionOnError(error: Errors) = {
     mockCache()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2cc8f48a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d22a5a0..fb17543 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -18,11 +18,15 @@
 package unit.kafka.server
 
 
+import java.net.InetAddress
+import java.util
+
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.network.RequestChannel
+import kafka.network.RequestChannel.Session
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server._
@@ -30,14 +34,19 @@ import kafka.utils.{MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequestResponse, 
AddPartitionsToTxnRequest, RequestHeader}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
+import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Utils
-import org.easymock.EasyMock
-import org.junit.{Before, Test}
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.{Assert, Before, Test}
 
-import scala.collection.JavaConverters
+import scala.collection.JavaConverters._
+import scala.collection.Map
 
 
 class KafkaApisTest {
@@ -53,7 +62,10 @@ class KafkaApisTest {
   private val metrics = new Metrics()
   private val brokerId = 1
   private val authorizer: Option[Authorizer] = None
-  private val quotas = EasyMock.createNiceMock(classOf[QuotaManagers])
+  private val clientQuotaManager = 
EasyMock.createNiceMock(classOf[ClientQuotaManager])
+  private val clientRequestQuotaManager = 
EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+  private val replicaQuotaManager = 
EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+  private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, 
clientRequestQuotaManager, replicaQuotaManager, replicaQuotaManager)
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"
   private val time = new MockTime
@@ -107,5 +119,121 @@ class KafkaApisTest {
   def 
shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported():
 Unit = {
     createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null)
   }
-  
+
+  @Test
+  def 
shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired():
 Unit = {
+    val topicPartition = new TopicPartition("t", 0)
+    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: 
RequestChannel.Request) = 
createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+    val expectedErrors = Map(topicPartition -> 
Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT).asJava
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.getMagic(topicPartition))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+
+    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
+
+    val send = capturedResponse.getValue.responseSend.get
+    val channel = new ByteBufferChannel(send.size())
+    send.writeTo(channel)
+    channel.close()
+
+    // read the size
+    channel.buffer.getInt()
+
+    val responseHeader = ResponseHeader.parse(channel.buffer)
+    val struct = 
ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
+
+    val markersResponse = new WriteTxnMarkersResponse(struct)
+    Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+  }
+
+  @Test
+  def 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition():
 Unit = {
+    val tp1 = new TopicPartition("t", 0)
+    val tp2 = new TopicPartition("t1", 0)
+    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: 
RequestChannel.Request) = createWriteTxnMarkersRequest(Utils.mkList(tp1, tp2))
+    val expectedErrors = Map(tp1 -> Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, tp2 
-> Errors.NONE).asJava
+
+    val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+    val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => 
Unit]  = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.getMagic(tp1))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+    EasyMock.expect(replicaManager.getMagic(tp2))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.eq(true),
+      EasyMock.eq(false),
+      EasyMock.anyObject(),
+      EasyMock.capture(responseCallback),
+      EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
+      override def answer(): Unit = {
+        responseCallback.getValue.apply(Map(tp2 -> new 
PartitionResponse(Errors.NONE)))
+      }
+    })
+
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(replicaManager, replicaQuotaManager, requestChannel)
+
+
+    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
+
+    val send = capturedResponse.getValue.responseSend.get
+    val channel = new ByteBufferChannel(send.size())
+    send.writeTo(channel)
+    channel.close()
+
+    // read the size
+    channel.buffer.getInt()
+
+    val responseHeader = ResponseHeader.parse(channel.buffer)
+    val struct = 
ApiKeys.WRITE_TXN_MARKERS.responseSchema(writeTxnMarkersRequest.version()).read(channel.buffer)
+
+    val markersResponse = new WriteTxnMarkersResponse(struct)
+    Assert.assertEquals(expectedErrors, markersResponse.errors(1))
+    EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
+    val topicPartition = new TopicPartition("t", 0)
+    val (writeTxnMarkersRequest: WriteTxnMarkersRequest, request: 
RequestChannel.Request) = 
createWriteTxnMarkersRequest(Utils.mkList(topicPartition))
+    EasyMock.expect(replicaManager.getMagic(topicPartition))
+      .andReturn(Some(RecordBatch.MAGIC_VALUE_V2))
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      EasyMock.eq(true),
+      EasyMock.eq(false),
+      EasyMock.anyObject(),
+      EasyMock.anyObject(),
+      EasyMock.anyObject()))
+
+    EasyMock.replay(replicaManager)
+
+    createKafkaApis(KAFKA_0_11_0_IV0).handleWriteTxnMarkersRequest(request)
+    EasyMock.verify(replicaManager)
+  }
+
+  private def createWriteTxnMarkersRequest(partitions: 
util.List[TopicPartition]) = {
+    val writeTxnMarkersRequest = new 
WriteTxnMarkersRequest.Builder(Utils.mkList(
+    new TxnMarkerEntry(1, 1.toShort, 0, TransactionResult.COMMIT, 
partitions))).build()
+    val header = new RequestHeader(ApiKeys.WRITE_TXN_MARKERS.id, 
writeTxnMarkersRequest.version(), "", 0)
+    val byteBuffer = writeTxnMarkersRequest.serialize(header)
+
+    val request = RequestChannel.Request(1, "1",
+    Session(KafkaPrincipal.ANONYMOUS,
+    InetAddress.getLocalHost),
+    byteBuffer, 0,
+    new ListenerName(""),
+    SecurityProtocol.PLAINTEXT)
+    (writeTxnMarkersRequest, request)
+  }
+
+
 }

Reply via email to