Repository: kafka Updated Branches: refs/heads/trunk 2181ae768 -> b3a33ce4b
KAFKA-5188; Integration tests for transactions Author: Apurva Mehta <[email protected]> Author: Jason Gustafson <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #2994 from apurvam/KAFKA-5188-exactly-once-integration-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a33ce4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a33ce4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a33ce4 Branch: refs/heads/trunk Commit: b3a33ce4b81a20ae5635cf28490fd2e1f9d86141 Parents: 2181ae7 Author: Apurva Mehta <[email protected]> Authored: Wed May 17 16:20:33 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 17 16:20:33 2017 -0700 ---------------------------------------------------------------------- .../clients/producer/internals/Sender.java | 59 ++- .../producer/internals/TransactionManager.java | 25 +- .../kafka/common/requests/EndTxnRequest.java | 4 + .../internals/TransactionManagerTest.java | 47 ++- .../transaction/TransactionCoordinator.scala | 24 +- .../transaction/TransactionMetadata.scala | 2 +- .../transaction/TransactionStateManager.scala | 1 - .../kafka/api/TransactionsBounceTest.scala | 185 +++++++++ .../kafka/api/TransactionsTest.scala | 400 +++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 95 ++++- 10 files changed, 806 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 209a979..7180171 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -39,7 +40,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.InitProducerIdRequest; @@ -187,10 +187,12 @@ public class Sender implements Runnable { * @param now The current POSIX time in milliseconds */ void run(long now) { - long pollTimeout = 0; - if (!maybeSendTransactionalRequest(now)) + long pollTimeout = retryBackoffMs; + if (!maybeSendTransactionalRequest(now)) { pollTimeout = sendProducerData(now); + } + log.trace("waiting {}ms in poll", pollTimeout); this.client.poll(pollTimeout, now); } @@ -203,6 +205,7 @@ public class Sender implements Runnable { final KafkaException exception = transactionManager.lastError() instanceof KafkaException ? (KafkaException) transactionManager.lastError() : new KafkaException(transactionManager.lastError()); + log.error("aborting producer batches because the transaction manager is in an error state.", exception); this.accumulator.abortBatches(exception); return Long.MAX_VALUE; } @@ -281,25 +284,35 @@ public class Sender implements Runnable { } private boolean maybeSendTransactionalRequest(long now) { - if (transactionManager != null && transactionManager.hasInflightRequest()) + if (transactionManager == null || !transactionManager.isTransactional()) + return false; + + if (transactionManager.hasInflightRequest()) { + log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.", + transactionManager.transactionalId()); return true; + } - if (transactionManager == null || !transactionManager.hasPendingTransactionalRequests()) + if (!transactionManager.hasPendingTransactionalRequests()) { + log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId()); return false; + } TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(); - if (nextRequestHandler.isEndTxn()) { - if (transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) { - if (!accumulator.flushInProgress()) - accumulator.beginFlush(); - transactionManager.reenqueue(nextRequestHandler); - return false; - } else if (transactionManager.isInErrorState()) { - nextRequestHandler.fatal(new KafkaException("Cannot commit transaction when there are " + - "request errors. Please check your logs for the details of the errors encountered.")); - return false; - } + if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) { + if (!accumulator.flushInProgress()) + accumulator.beginFlush(); + transactionManager.reenqueue(nextRequestHandler); + log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " + + "end transaction request", transactionManager.transactionalId()); + return false; + } + + if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) { + log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state", + transactionManager.transactionalId()); + return false; } Node targetNode = null; @@ -314,7 +327,6 @@ public class Sender implements Runnable { } if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) { transactionManager.lookupCoordinator(nextRequestHandler); - targetNode = null; break; } } else { @@ -322,23 +334,30 @@ public class Sender implements Runnable { } if (targetNode != null) { if (nextRequestHandler.isRetry()) { + log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}", + transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder()); time.sleep(retryBackoffMs); } ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(), now, true, nextRequestHandler); transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId()); + log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(), + nextRequestHandler.requestBuilder(), clientRequest.destination()); client.send(clientRequest, now); return true; } } catch (IOException e) { - log.warn("Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", e); + targetNode = null; + log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " + + "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e); } + log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.", + transactionManager.transactionalId(), retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); } - if (targetNode == null) - transactionManager.retry(nextRequestHandler); + transactionManager.retry(nextRequestHandler); return true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 551a75a..55c1782 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -95,7 +95,8 @@ public class TransactionManager { case INITIALIZING: return source == UNINITIALIZED || source == ERROR; case READY: - return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION; + return source == INITIALIZING || source == COMMITTING_TRANSACTION + || source == ABORTING_TRANSACTION || source == ERROR; case IN_TRANSACTION: return source == READY; case COMMITTING_TRANSACTION: @@ -246,7 +247,7 @@ public class TransactionManager { } public boolean isInErrorState() { - return currentState == State.ERROR; + return currentState == State.ERROR || currentState == State.FENCED; } public synchronized void setError(Exception exception) { @@ -256,6 +257,23 @@ public class TransactionManager { transitionTo(State.ERROR, exception); } + boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) { + if (isInErrorState() && requestHandler.isEndTxn()) { + // We shouldn't terminate abort requests from error states. + EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler; + if (endTxnHandler.builder.result() == TransactionResult.ABORT) + return false; + String errorMessage = "Cannot commit transaction because at least one previous transactional request " + + "was not completed successfully."; + if (lastError != null) + requestHandler.fatal(new KafkaException(errorMessage, lastError)); + else + requestHandler.fatal(new KafkaException(errorMessage)); + return true; + } + return false; + } + /** * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to * verify that the result is valid. @@ -484,6 +502,7 @@ public class TransactionManager { "transactional.id has been started: producerId: {}. epoch: {}.", producerIdAndEpoch.producerId, producerIdAndEpoch.epoch); result.setError(Errors.INVALID_PRODUCER_EPOCH.exception()); + lastError = Errors.INVALID_PRODUCER_EPOCH.exception(); transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception()); result.done(); } @@ -501,10 +520,12 @@ public class TransactionManager { } else { clearInFlightRequestCorrelationId(); if (response.wasDisconnected()) { + log.trace("disconnected from " + response.destination() + ". Will retry."); reenqueue(); } else if (response.versionMismatch() != null) { fatal(response.versionMismatch()); } else if (response.hasResponse()) { + log.trace("Got transactional response for request:" + requestBuilder()); handleResponse(response.responseBody()); } else { fatal(new KafkaException("Could not execute transactional request for unknown reasons")); http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index ff9b82c..77ec137 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -42,6 +42,10 @@ public class EndTxnRequest extends AbstractRequest { this.result = result; } + public TransactionResult result() { + return result; + } + @Override public EndTxnRequest build(short version) { return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result); http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 4db0452..6a35061 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -433,7 +433,7 @@ public class TransactionManagerTest { transactionManager.maybeAddPartitionToTransaction(tp0); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), - "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future; + "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; assertFalse(responseFuture.isDone()); prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); @@ -442,6 +442,8 @@ public class TransactionManagerTest { sender.run(time.milliseconds()); // send produce. + assertTrue(responseFuture.isDone()); + assertTrue(transactionManager.isInErrorState()); responseFuture.get(); } @@ -500,6 +502,48 @@ public class TransactionManagerTest { } @Test + public void testAllowAbortOnProduceFailure() throws InterruptedException { + client.setNode(brokerNode); + // This is called from the initTransactions method in the producer as the first order of business. + // It finds the coordinator and then gets a PID. + final long pid = 13131L; + final short epoch = 1; + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); + + sender.run(time.milliseconds()); // find coordinator + sender.run(time.milliseconds()); + + prepareInitPidResponse(Errors.NONE, false, pid, epoch); + sender.run(time.milliseconds()); // get pid. + + assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION)); + assertTrue(transactionManager.hasProducerId()); + + transactionManager.beginTransaction(); + transactionManager.maybeAddPartitionToTransaction(tp0); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), + "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future; + + TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction(); + assertFalse(responseFuture.isDone()); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid); + prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch); + + sender.run(time.milliseconds()); // Send AddPartitionsRequest + assertFalse(abortResult.isCompleted()); + + sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException. + sender.run(time.milliseconds()); // try to abort + assertTrue(abortResult.isCompleted()); + assertTrue(abortResult.isSuccessful()); + assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now. + } + + + @Test public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception { verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED); } @@ -519,6 +563,7 @@ public class TransactionManagerTest { final long pid = 1L; final short epoch = 1; + prepareInitPidResponse(Errors.NONE, false, pid, epoch); sender.run(time.milliseconds()); // get pid. http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 233f7d7..8148cb6 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -271,6 +271,14 @@ class TransactionCoordinator(brokerId: Int, txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) } + private def logInvalidStateTransitionAndReturnError(transactionalId: String, + transactionState: TransactionState, + transactionResult: TransactionResult) = { + error(s"transactionalId: $transactionalId -- Current state is $transactionState, but received transaction " + + s"marker result: $transactionResult") + Left(Errors.INVALID_TXN_STATE) + } + def handleEndTransaction(transactionalId: String, producerId: Long, producerEpoch: Short, @@ -306,24 +314,24 @@ class TransactionCoordinator(brokerId: Int, if (txnMarkerResult == TransactionResult.COMMIT) Left(Errors.NONE) else - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case CompleteAbort => if (txnMarkerResult == TransactionResult.ABORT) Left(Errors.NONE) else - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case PrepareCommit => if (txnMarkerResult == TransactionResult.COMMIT) Left(Errors.CONCURRENT_TRANSACTIONS) else - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case PrepareAbort => if (txnMarkerResult == TransactionResult.ABORT) Left(Errors.CONCURRENT_TRANSACTIONS) else - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case Empty => - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) } } } @@ -349,15 +357,15 @@ class TransactionCoordinator(brokerId: Int, Left(Errors.CONCURRENT_TRANSACTIONS) else txnMetadata.state match { case Empty| Ongoing | CompleteCommit | CompleteAbort => - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) case PrepareCommit => if (txnMarkerResult != TransactionResult.COMMIT) - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) case PrepareAbort => if (txnMarkerResult != TransactionResult.ABORT) - Left(Errors.INVALID_TXN_STATE) + logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult) else Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds())) } http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala index d05676b..0d176aa 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala @@ -135,7 +135,7 @@ private[transaction] class TransactionMetadata(val producerId: Long, } def removePartition(topicPartition: TopicPartition): Unit = { - if (pendingState.isDefined || (state != PrepareCommit && state != PrepareAbort)) + if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort)) throw new IllegalStateException(s"Transation metadata's current state is $state, and its pending state is $state " + s"while trying to remove partitions whose txn marker has been sent, this is not expected") http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index cf41fc3..2327213 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -155,7 +155,6 @@ class TransactionStateManager(brokerId: Int, props.put(LogConfig.UncleanLeaderElectionEnableProp, "false") props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - props.put(LogConfig.MinInSyncReplicasProp, config.transactionLogMinInsyncReplicas.toString) props.put(LogConfig.SegmentBytesProp, config.transactionLogSegmentBytes.toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala new file mode 100644 index 0000000..f1fd365 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.util.Properties + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{ShutdownableThread, TestUtils} +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.{Ignore, Test} + +import scala.collection.JavaConversions._ +import org.junit.Assert._ + + +class TransactionsBounceTest extends KafkaServerTestHarness { + private val producerBufferSize = 65536 + private val serverMessageMaxBytes = producerBufferSize/2 + private val numPartitions = 3 + + val numServers = 4 + private val outputTopic = "output-topic" + private val inputTopic = "input-topic" + + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) + overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString) + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString) + overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString) + overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) + overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) + overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 3.toString) + overridingProps.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) + overridingProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 1.toString) + overridingProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 3.toString) + overridingProps.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout + overridingProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + + + // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient + // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing + // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation + // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have + // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a + // running server. + // + // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving + // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems. + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true) + .map(KafkaConfig.fromProps(_, overridingProps)) + } + + @Ignore // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling + @Test + def testBrokerFailure() { + // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers + // constantly through the period. + val consumerGroup= "myGroup" + val numInputRecords = 5000 + createTopics() + + TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers) + + var consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) + val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers) + + val scheduler = new BounceScheduler + producer.initTransactions() + scheduler.start() + + var numMessagesProcessed = 0 + var iteration = 0 + try { + while (numMessagesProcessed < numInputRecords) { + val toRead = Math.min(200, numInputRecords - numMessagesProcessed) + trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..") + val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead) + trace(s"received ${records.size} messages. sending them transactionally to $outputTopic") + producer.beginTransaction() + val shouldAbort = iteration % 10 == 0 + records.zipWithIndex.foreach { case (record, i) => + producer.send( + TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort), + new ErrorLoggingCallback(outputTopic, record.key, record.value, true)) + } + trace(s"Sent ${records.size} messages. Committing offsets.") + producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup) + if (shouldAbort) { + trace(s"Committed offsets. Aborting transaction of ${records.size} messages.") + producer.abortTransaction() + consumer.close() + consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) + } else { + trace(s"Committed offsets. committing transaction of ${records.size} messages.") + producer.commitTransaction() + numMessagesProcessed += records.size + } + iteration += 1 + } + } finally { + producer.close() + consumer.close() + } + + scheduler.shutdown() + + val verifyingConsumer = createConsumerAndSubscribeToTopics("randoGroup", List(outputTopic), readCommitted = true) + val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { case(record) => + TestUtils.assertCommittedAndGetValue(record).toInt + } + val recordSet = outputRecords.toSet + assertEquals(numInputRecords, recordSet.size) + + val expectedValues = (0 until numInputRecords).toSet + assertEquals(s"Missing messages: ${expectedValues -- recordSet}", expectedValues, recordSet) + + verifyingConsumer.close() + } + + private def createConsumerAndSubscribeToTopics(groupId: String, topics: List[String], readCommitted: Boolean = false) = { + val props = new Properties() + if (readCommitted) + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200") + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000") + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) + consumer.subscribe(topics) + consumer + } + + private def createTopics() = { + val topicConfig = new Properties() + topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) + TestUtils.createTopic(zkUtils, inputTopic, numPartitions, numServers, servers, topicConfig) + TestUtils.createTopic(zkUtils, outputTopic, numPartitions, numServers, servers, topicConfig) + } + + private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) { + override def doWork(): Unit = { + for (server <- servers) { + trace("Shutting down server : %s".format(server.config.brokerId)) + server.shutdown() + server.awaitShutdown() + Thread.sleep(500) + trace("Server %s shut down. Starting it up again.".format(server.config.brokerId)) + server.startup() + trace("Restarted server: %s".format(server.config.brokerId)) + Thread.sleep(500) + } + + (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, outputTopic, partition)) + } + + override def shutdown(){ + super.shutdown() + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala new file mode 100644 index 0000000..3e19bb9 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.util.Properties + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.ProducerFencedException +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.{After, Before, Ignore, Test} +import org.junit.Assert._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionException +import scala.util.Random + +class TransactionsTest extends KafkaServerTestHarness { + val numServers = 3 + val topic1 = "topic1" + val topic2 = "topic2" + + + override def generateConfigs : Seq[KafkaConfig] = { + TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps())) + } + + @Before + override def setUp(): Unit = { + super.setUp() + val numPartitions = 3 + val topicConfig = new Properties() + topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString) + TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig) + TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig) + } + + @After + override def tearDown(): Unit = { + super.tearDown() + } + + @Test + def testBasicTransactions() = { + val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id", servers) + val consumer = transactionalConsumer("transactional-group") + val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group") + try { + producer.initTransactions() + + producer.beginTransaction() + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false)) + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false)) + producer.abortTransaction() + + producer.beginTransaction() + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true)) + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true)) + producer.commitTransaction() + + consumer.subscribe(List(topic1, topic2)) + unCommittedConsumer.subscribe(List(topic1, topic2)) + + val records = pollUntilExactlyNumRecords(consumer, 2) + records.zipWithIndex.foreach { case (record, i) => + TestUtils.assertCommittedAndGetValue(record) + } + + val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4) + val expectedValues = List("1", "2", "3", "4").toSet + allRecords.zipWithIndex.foreach { case (record, i) => + assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record))) + } + } finally { + consumer.close() + producer.close() + unCommittedConsumer.close() + } + } + + @Test + def testSendOffsets() = { + // The basic plan for the test is as follows: + // 1. Seed topic1 with 1000 unique, numbered, messages. + // 2. Run a consume/process/produce loop to transactionally copy messages from topic1 to topic2 and commit + // offsets as part of the transaction. + // 3. Randomly abort transactions in step2. + // 4. Validate that we have 1000 unique committed messages in topic2. If the offsets were committed properly with the + // transactions, we should not have any duplicates or missing messages since we should process in the input + // messages exactly once. + + val transactionalId = "foobar-id" + val consumerGroupId = "foobar-consumer-group" + val numSeedMessages = 500 + + TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers) + + val producer = TestUtils.createTransactionalProducer(transactionalId, servers) + + var consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) + consumer.subscribe(List(topic1)) + producer.initTransactions() + + val random = new Random() + var shouldCommit = false + var recordsProcessed = 0 + try { + while (recordsProcessed < numSeedMessages) { + producer.beginTransaction() + shouldCommit = !shouldCommit + + val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed)) + records.zipWithIndex.foreach { case (record, i) => + val key = new String(record.key(), "UTF-8") + val value = new String(record.value(), "UTF-8") + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit)) + } + + producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId) + if (shouldCommit) { + producer.commitTransaction() + recordsProcessed += records.size + debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + s"records written to $topic2: $recordsProcessed") + } else { + producer.abortTransaction() + debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + + s"records written to $topic2: $recordsProcessed") + consumer.close() + consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) + consumer.subscribe(List(topic1)) + } + } + } finally { + producer.close() + consumer.close() + } + + // Inspite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not + // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally. + val verifyingConsumer = transactionalConsumer("foobargroup") + verifyingConsumer.subscribe(List(topic2)) + val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record => + TestUtils.assertCommittedAndGetValue(record).toInt + } + verifyingConsumer.close() + val valueSet = valueSeq.toSet + assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size) + assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size, valueSet.size) + } + + @Test + def testFencingOnCommit() = { + val transactionalId = "my-t.id" + val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers) + val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers) + val consumer = transactionalConsumer() + consumer.subscribe(List(topic1, topic2)) + + try { + producer1.initTransactions() + + producer1.beginTransaction() + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false)) + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false)) + + producer2.initTransactions() // ok, will abort the open transaction. + producer2.beginTransaction() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)) + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)) + + try { + producer1.commitTransaction() + fail("Should not be able to commit transactions from a fenced producer.") + } catch { + case e : ProducerFencedException => + // good! + producer1.close() + case e : Exception => + fail("Got an unexpected exception from a fenced producer.", e) + } + + producer2.commitTransaction() // ok + + val records = pollUntilExactlyNumRecords(consumer, 2) + records.zipWithIndex.foreach { case (record, i) => + TestUtils.assertCommittedAndGetValue(record) + } + } finally { + consumer.close() + producer2.close() + } + } + + @Test + def testFencingOnSendOffsets() = { + val transactionalId = "my-t.id" + val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers) + val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers) + val consumer = transactionalConsumer() + consumer.subscribe(List(topic1, topic2)) + + try { + producer1.initTransactions() + + producer1.beginTransaction() + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false)) + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false)) + + producer2.initTransactions() // ok, will abort the open transaction. + producer2.beginTransaction() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)) + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)) + + try { + producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)), "foobarGroup") + fail("Should not be able to send offsets from a fenced producer.") + } catch { + case e : ProducerFencedException => + // good! + producer1.close() + case e : Exception => + fail("Got an unexpected exception from a fenced producer.", e) + } + + producer2.commitTransaction() // ok + + val records = pollUntilExactlyNumRecords(consumer, 2) + records.zipWithIndex.foreach { case (record, i) => + TestUtils.assertCommittedAndGetValue(record) + } + } finally { + consumer.close() + producer2.close() + } + } + + @Ignore @Test + def testFencingOnSend() { + val transactionalId = "my-t.id" + val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers) + val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers) + val consumer = transactionalConsumer() + consumer.subscribe(List(topic1, topic2)) + + try { + producer1.initTransactions() + + producer1.beginTransaction() + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false)) + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false)) + + producer2.initTransactions() // ok, will abort the open transaction. + producer2.beginTransaction() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get() + + try { + val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false)) + val recordMetadata = result.get() + error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!") + servers.foreach { case (server) => + error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}") + } + fail("Should not be able to send messages from a fenced producer.") + } catch { + case e : ProducerFencedException => + producer1.close() + case e : ExecutionException => + assertTrue(e.getCause.isInstanceOf[ProducerFencedException]) + producer1.close() + case e : Exception => + fail("Got an unexpected exception from a fenced producer.", e) + } + + producer2.commitTransaction() // ok + + val records = pollUntilExactlyNumRecords(consumer, 2) + records.zipWithIndex.foreach { case (record, i) => + TestUtils.assertCommittedAndGetValue(record) + } + } finally { + consumer.close() + producer2.close() + } + } + + @Test + def testFencingOnAddPartitions(): Unit = { + val transactionalId = "my-t.id" + val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers) + val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers) + val consumer = transactionalConsumer() + consumer.subscribe(List(topic1, topic2)) + + try { + producer1.initTransactions() + + producer1.beginTransaction() + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false)) + producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false)) + producer1.abortTransaction() + + producer2.initTransactions() // ok, will abort the open transaction. + producer2.beginTransaction() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get() + producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get() + + try { + producer1.beginTransaction() + val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false)) + val recordMetadata = result.get() + error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!") + servers.foreach { case (server) => + error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}") + } + fail("Should not be able to send messages from a fenced producer.") + } catch { + case e : ProducerFencedException => + producer1.close() + case e : ExecutionException => + assertTrue(e.getCause.isInstanceOf[ProducerFencedException]) + producer1.close() + case e : Exception => + fail("Got an unexpected exception from a fenced producer.", e) + } + + producer2.commitTransaction() // ok + + val records = pollUntilExactlyNumRecords(consumer, 2) + records.zipWithIndex.foreach { case (record, i) => + TestUtils.assertCommittedAndGetValue(record) + } + } finally { + consumer.close() + producer2.close() + } + } + + private def serverProps() = { + val serverProps = new Properties() + serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString) + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString) + serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString) + serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString) + serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString) + serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString) + serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString) + serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString) + serverProps + } + + private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = { + val props = new Properties() + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) + TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) + } + + private def nonTransactionalConsumer(group: String = "group") = { + val props = new Properties() + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted") + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props)) + } + + private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() + TestUtils.waitUntilTrue(() => { + records ++= consumer.poll(50) + records.size == numRecords + }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.") + records + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 6bee18d..90dcacd 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -20,7 +20,7 @@ package kafka.utils import java.io._ import java.nio._ import java.nio.channels._ -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} import java.security.cert.X509Certificate import java.util.{ArrayList, Collections, Properties} import java.util.concurrent.{Callable, Executors, TimeUnit} @@ -40,9 +40,10 @@ import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils._ import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor} +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.protocol.SecurityProtocol @@ -55,8 +56,9 @@ import org.apache.zookeeper.ZooDefs._ import org.apache.zookeeper.data.ACL import org.junit.Assert._ +import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import scala.collection.Map +import scala.collection.{Map, mutable} import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.Try @@ -75,6 +77,10 @@ object TestUtils extends Logging { /** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */ val MockZkConnect = "127.0.0.1:" + MockZkPort + private val transactionStatusKey = "transactionStatus" + private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8) + private val abortedValue : Array[Byte] = "aborted".getBytes(StandardCharsets.UTF_8) + /** * Create a temporary directory */ @@ -1330,6 +1336,89 @@ object TestUtils extends Logging { assertEquals("Consumed more records than expected", numMessages, records.size) records } + + def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer]) = { + val props = new Properties() + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props)) + } + + // Seeds the given topic with records with keys and values in the range [0..numRecords) + def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Int = { + val props = new Properties() + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + var recordsWritten = 0 + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props)) + try { + for (i <- 0 until numRecords) { + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString))) + recordsWritten += 1 + } + producer.flush() + } finally { + producer.close() + } + recordsWritten + } + + private def asString(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) + + private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8) + + // Verifies that the record was intended to be committed by checking the the headers for an expected transaction status + // If true, this will return the value as a string. It is expected that the record in question should have been created + // by the `producerRecordWithExpectedTransactionStatus` method. + def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = { + record.headers.headers(transactionStatusKey).headOption match { + case Some(header) => + assertEquals(s"Got ${asString(header.value)} but expected the value to indicate " + + s"committed status.", asString(committedValue), asString(header.value)) + case None => + fail("expected the record header to include an expected transaction status, but received nothing.") + } + recordValueAsString(record) + } + + def recordValueAsString(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = { + asString(record.value) + } + + def producerRecordWithExpectedTransactionStatus(topic: String, key: Array[Byte], value: Array[Byte], + willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = { + val header = new Header {override def key() = transactionStatusKey + override def value() = if (willBeCommitted) + committedValue + else + abortedValue + } + new ProducerRecord[Array[Byte], Array[Byte]](topic, null, key, value, List(header)) + } + + def producerRecordWithExpectedTransactionStatus(topic: String, key: String, value: String, + willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = { + producerRecordWithExpectedTransactionStatus(topic, asBytes(key), asBytes(value), willBeCommitted) + } + + // Collect the current positions for all partition in the consumers current assignment. + def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : Map[TopicPartition, OffsetAndMetadata] = { + val offsetsToCommit = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + consumer.assignment.foreach{ topicPartition => + offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition))) + } + offsetsToCommit.toMap + } + + def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() + TestUtils.waitUntilTrue(() => { + records ++= consumer.poll(50) + records.size >= numRecords + }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.") + records + } + } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
