Repository: kafka Updated Branches: refs/heads/0.11.0 b42cefe2f -> dc15062ea
KAFKA-5281; System tests for transactions Author: Apurva Mehta <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3149 from apurvam/KAFKA-5281-transactions-system-tests (cherry picked from commit 1959835d9e148f0eb6407b36ff96b334d5e785cb) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc15062e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc15062e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc15062e Branch: refs/heads/0.11.0 Commit: dc15062ea9c8f9db864e6114819966daa40f6a45 Parents: b42cefe Author: Apurva Mehta <[email protected]> Authored: Thu Jun 1 10:25:29 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu Jun 1 10:27:01 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/TransactionManager.java | 4 +- .../common/requests/OffsetCommitResponse.java | 1 + .../requests/TxnOffsetCommitResponse.java | 1 + ...nsactionMarkerRequestCompletionHandler.scala | 3 +- .../scala/kafka/tools/ConsoleConsumer.scala | 12 +- tests/kafkatest/services/console_consumer.py | 9 +- .../services/transactional_message_copier.py | 183 ++++++++++++ tests/kafkatest/tests/core/transactions_test.py | 207 +++++++++++++ tests/kafkatest/version.py | 5 + .../kafka/tools/TransactionalMessageCopier.java | 287 +++++++++++++++++++ 10 files changed, 706 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 221816c..11068a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -928,7 +928,9 @@ public class TransactionManager { Errors error = entry.getValue(); if (error == Errors.NONE) { pendingTxnOffsetCommits.remove(topicPartition); - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { + } else if (error == Errors.COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR + || error == Errors.REQUEST_TIMED_OUT) { hadFailure = true; if (!coordinatorReloaded) { coordinatorReloaded = true; http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 06e5608..782ffa5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -44,6 +44,7 @@ public class OffsetCommitResponse extends AbstractResponse { * Possible error codes: * * UNKNOWN_TOPIC_OR_PARTITION (3) + * REQUEST_TIMED_OUT (7) * OFFSET_METADATA_TOO_LARGE (12) * COORDINATOR_LOAD_IN_PROGRESS (14) * GROUP_COORDINATOR_NOT_AVAILABLE (15) http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index e7b349c..4c0f010 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -42,6 +42,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse { // GroupAuthorizationFailed // InvalidCommitOffsetSize // TransactionalIdAuthorizationFailed + // RequestTimedOut private final Map<TopicPartition, Errors> errors; private final int throttleTimeMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 5fa6035..da40001 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -139,7 +139,8 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int, case Errors.UNKNOWN_TOPIC_OR_PARTITION | Errors.NOT_LEADER_FOR_PARTITION | Errors.NOT_ENOUGH_REPLICAS | - Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND => // these are retriable errors + Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND | + Errors.REQUEST_TIMED_OUT => // these are retriable errors info(s"Sending $transactionalId's transaction marker for partition $topicPartition has failed with error ${error.exceptionName}, retrying " + s"with current coordinator epoch ${epochAndMetadata.coordinatorEpoch}") http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 193a344..335c724 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -200,7 +200,7 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel) props } @@ -264,7 +264,7 @@ object ConsoleConsumer extends Logging { "skip it instead of halt.") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be output here") .withRequiredArg .describedAs("metrics directory") .ofType(classOf[java.lang.String]) @@ -284,6 +284,13 @@ object ConsoleConsumer extends Logging { val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events", "Log lifecycle events of the consumer in addition to logging consumed " + "messages. (This is specific for system tests.)") + val isolationLevelOpt = parser.accepts("isolation-level", + "Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommitted" + + "to read all messages.") + .withRequiredArg() + .ofType(classOf[String]) + .defaultsTo("read_uncommitted") + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") @@ -314,6 +321,7 @@ object ConsoleConsumer extends Logging { val bootstrapServer = options.valueOf(bootstrapServerOpt) val keyDeserializer = options.valueOf(keyDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt) + val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] if (keyDeserializer != null && !keyDeserializer.isEmpty) { http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index d55d012..6fad674 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -21,7 +21,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.monitor.jmx import JmxMixin -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0 +from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0 """ 0.8.2.1 ConsoleConsumer options @@ -97,7 +97,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True, message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH, client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, - enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False): + enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False, + isolation_level="read_uncommitted"): """ Args: context: standard context @@ -117,6 +118,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) stop_timeout_sec After stopping a node, wait up to stop_timeout_sec for the node to stop, and the corresponding background thread to finish successfully. print_timestamp if True, print each message's timestamp as well + isolation_level How to handle transactional messages. """ JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or []) BackgroundThreadService.__init__(self, context, num_nodes) @@ -140,6 +142,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) self.log_level = "TRACE" self.stop_timeout_sec = stop_timeout_sec + self.isolation_level = isolation_level self.enable_systest_events = enable_systest_events if self.enable_systest_events: # Only available in 0.10.0 and up @@ -190,6 +193,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) if node.version <= LATEST_0_10_0: cmd += " --new-consumer" cmd += " --bootstrap-server %(broker_list)s" % args + if node.version >= V_0_11_0_0: + cmd += " --isolation-level %s" % self.isolation_level else: cmd += " --zookeeper %(zk_connect)s" % args if self.from_beginning: http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/tests/kafkatest/services/transactional_message_copier.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py new file mode 100644 index 0000000..153e02c --- /dev/null +++ b/tests/kafkatest/services/transactional_message_copier.py @@ -0,0 +1,183 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +import signal + +from ducktape.utils.util import wait_until +from ducktape.services.background_thread import BackgroundThreadService +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from ducktape.cluster.remoteaccount import RemoteCommandError + +class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.TransactionalMessageCopier for + use in system testing. + """ + PERSISTENT_ROOT = "/mnt/transactional_message_copier" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "transactional_message_copier.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "transactional_message_copier.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + + logs = { + "transactional_message_copier_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": True}, + "transactional_message_copier_stderr": { + "path": STDERR_CAPTURE, + "collect_default": True}, + "transactional_message_copier_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, transactional_id, consumer_group, + input_topic, input_partition, output_topic, max_messages = -1, + transaction_size = 1000, log_level="INFO"): + super(TransactionalMessageCopier, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.transactional_id = transactional_id + self.consumer_group = consumer_group + self.transaction_size = transaction_size + self.input_topic = input_topic + self.input_partition = input_partition + self.output_topic = output_topic + self.max_messages = max_messages + self.message_copy_finished = False + self.consumed = -1 + self.remaining = -1 + self.stop_timeout_sec = 60 + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % TransactionalMessageCopier.PERSISTENT_ROOT, + allow_fail=False) + # Create and upload log properties + log_config = self.render('tools_log4j.properties', + log_file=TransactionalMessageCopier.LOG_FILE) + node.account.create_file(TransactionalMessageCopier.LOG4J_CONFIG, log_config) + # Configure security + self.security_config = self.kafka.security_config.client_config(node=node) + self.security_config.setup_node(node) + cmd = self.start_cmd(node, idx) + self.logger.debug("TransactionalMessageCopier %d command: %s" % (idx, cmd)) + try: + for line in node.account.ssh_capture(cmd): + line = line.strip() + data = self.try_parse_json(line) + if data is not None: + with self.lock: + self.remaining = int(data["remaining"]) + self.consumed = int(data["consumed"]) + self.logger.info("%s: consumed %d, remaining %d" % + (self.transactional_id, self.consumed, self.remaining)) + if "shutdown_complete" in data: + if self.remaining == 0: + # We are only finished if the remaining + # messages at the time of shutdown is 0. + # + # Otherwise a clean shutdown would still print + # a 'shutdown complete' messages even though + # there are unprocessed messages, causing + # tests to fail. + self.logger.info("%s : Finished message copy" % self.transactional_id) + self.message_copy_finished = True + else: + self.logger.info("%s : Shut down without finishing message copy." %\ + self.transactional_id) + except RemoteCommandError as e: + self.logger.debug("Got exception while reading output from copier, \ + probably because it was SIGKILL'd (exit code 137): %s" % str(e)) + + def start_cmd(self, node, idx): + cmd = "export LOG_DIR=%s;" % TransactionalMessageCopier.LOG_DIR + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % TransactionalMessageCopier.LOG4J_CONFIG + cmd += self.path.script("kafka-run-class.sh", node) + " org.apache.kafka.tools." + "TransactionalMessageCopier" + cmd += " --broker-list %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) + cmd += " --transactional-id %s" % self.transactional_id + cmd += " --consumer-group %s" % self.consumer_group + cmd += " --input-topic %s" % self.input_topic + cmd += " --output-topic %s" % self.output_topic + cmd += " --input-partition %s" % str(self.input_partition) + cmd += " --transaction-size %s" % str(self.transaction_size) + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + cmd += " 2>> %s | tee -a %s &" % (TransactionalMessageCopier.STDERR_CAPTURE, TransactionalMessageCopier.STDOUT_CAPTURE) + + return cmd + + def clean_node(self, node, clean_shutdown=True): + self.kill_node(node, clean_shutdown) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) + + def pids(self, node): + try: + cmd = "ps ax | grep -i TransactionalMessageCopier | grep java | grep -v grep | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (RemoteCommandError, ValueError) as e: + self.logger.error("Could not list pids: %s" % str(e)) + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + + def kill_node(self, node, clean_shutdown=True): + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + for pid in pids: + node.account.signal(pid, sig) + wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60, err_msg="Message Copier failed to stop") + + def stop_node(self, node, clean_shutdown=True): + self.kill_node(node, clean_shutdown) + stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) + assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ + (str(node.account), str(self.stop_timeout_sec)) + + def restart(self, clean_shutdown): + if self.is_done: + return + node = self.nodes[0] + with self.lock: + self.consumed = -1 + self.remaining = -1 + self.stop_node(node, clean_shutdown) + self.start_node(node) + + def try_parse_json(self, string): + """Try to parse a string as json. Return None if not parseable.""" + try: + record = json.loads(string) + return record + except ValueError: + self.logger.debug("Could not parse as json: %s" % str(string)) + return None + + @property + def is_done(self): + return self.message_copy_finished + + def progress_percent(self): + with self.lock: + if self.remaining < 0: + return 0 + if self.consumed + self.remaining == 0: + return 100 + return (float(self.consumed)/float(self.consumed + self.remaining)) * 100 http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/tests/kafkatest/tests/core/transactions_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py new file mode 100644 index 0000000..a98a1c9 --- /dev/null +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -0,0 +1,207 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.transactional_message_copier import TransactionalMessageCopier +from kafkatest.utils import is_int + +from ducktape.tests.test import Test +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + + +class TransactionsTest(Test): + """Tests transactions by transactionally copying data from a source topic to + a destination topic and killing the copy process as well as the broker + randomly through the process. In the end we verify that the final output + topic contains exactly one committed copy of each message in the input + topic + """ + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(TransactionsTest, self).__init__(test_context=test_context) + + self.input_topic = "input-topic" + self.output_topic = "output-topic" + + self.num_brokers = 3 + + # Test parameters + self.num_input_partitions = 2 + self.num_output_partitions = 3 + self.num_seed_messages = 20000 + self.transaction_size = 500 + self.first_transactional_id = "my-first-transactional-id" + self.second_transactional_id = "my-second-transactional-id" + self.consumer_group = "transactions-test-consumer-group" + + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(test_context, + num_nodes=self.num_brokers, + zk=self.zk, + topics = { + self.input_topic: { + "partitions": self.num_input_partitions, + "replication-factor": 3, + "configs": { + "min.insync.replicas": 2 + } + }, + self.output_topic: { + "partitions": self.num_output_partitions, + "replication-factor": 3, + "configs": { + "min.insync.replicas": 2 + } + } + }) + + def setUp(self): + self.zk.start() + + def seed_messages(self): + seed_timeout_sec = 10000 + seed_producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=self.input_topic, + message_validator=is_int, + max_messages=self.num_seed_messages, + enable_idempotence=True) + + seed_producer.start() + wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages, + timeout_sec=seed_timeout_sec, + err_msg="Producer failed to produce messages %d in %ds." %\ + (self.num_seed_messages, seed_timeout_sec)) + return seed_producer.acked + + def get_messages_from_output_topic(self): + consumer = ConsoleConsumer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=self.output_topic, + new_consumer=True, + message_validator=is_int, + from_beginning=True, + consumer_timeout_ms=5000, + isolation_level="read_committed") + consumer.start() + # ensure that the consumer is up. + wait_until(lambda: consumer.alive(consumer.nodes[0]) == True, + timeout_sec=60, + err_msg="Consumer failed to start for %ds" %\ + 60) + # wait until the consumer closes, which will be 5 seconds after + # receiving the last message. + wait_until(lambda: consumer.alive(consumer.nodes[0]) == False, + timeout_sec=60, + err_msg="Consumer failed to consume %d messages in %ds" %\ + (self.num_seed_messages, 60)) + return consumer.messages_consumed[1] + + def bounce_brokers(self, clean_shutdown): + for node in self.kafka.nodes: + if clean_shutdown: + self.kafka.restart_node(node, clean_shutdown = True) + else: + self.kafka.stop_node(node, clean_shutdown = False) + wait_until(lambda: len(self.kafka.pids(node)) == 0 and not self.kafka.is_registered(node), + timeout_sec=self.kafka.zk_session_timeout + 5, + err_msg="Failed to see timely deregistration of \ + hard-killed broker %s" % str(node.account)) + self.kafka.start_node(node) + + def create_and_start_message_copier(self, input_partition, transactional_id): + message_copier = TransactionalMessageCopier( + context=self.test_context, + num_nodes=1, + kafka=self.kafka, + transactional_id=transactional_id, + consumer_group=self.consumer_group, + input_topic=self.input_topic, + input_partition=input_partition, + output_topic=self.output_topic, + max_messages=-1, + transaction_size=self.transaction_size + ) + message_copier.start() + wait_until(lambda: message_copier.alive(message_copier.nodes[0]), + timeout_sec=10, + err_msg="Message copier failed to start after 10 s") + return message_copier + + def bounce_copiers(self, copiers, clean_shutdown): + for _ in range(3): + for copier in copiers: + wait_until(lambda: copier.progress_percent() >= 20.0, + timeout_sec=30, + err_msg="%s : Message copier didn't make enough progress in 30s. Current progress: %s" \ + % (copier.transactional_id, str(copier.progress_percent()))) + self.logger.info("%s - progress: %s" % (copier.transactional_id, + str(copier.progress_percent()))) + copier.restart(clean_shutdown) + + def create_and_start_copiers(self): + copiers = [] + copiers.append(self.create_and_start_message_copier( + input_partition=0, + transactional_id=self.first_transactional_id + )) + copiers.append(self.create_and_start_message_copier( + input_partition=1, + transactional_id=self.second_transactional_id + )) + return copiers + + def copy_messages_transactionally(self, failure_mode, bounce_target): + copiers = self.create_and_start_copiers() + clean_shutdown = False + if failure_mode == "clean_bounce": + clean_shutdown = True + + if bounce_target == "brokers": + self.bounce_brokers(clean_shutdown) + elif bounce_target == "clients": + self.bounce_copiers(copiers, clean_shutdown) + + for copier in copiers: + wait_until(lambda: copier.is_done, + timeout_sec=60, + err_msg="%s - Failed to copy all messages in %ds." %\ + (copier.transactional_id, 60)) + self.logger.info("finished copying messages") + + @cluster(num_nodes=8) + @matrix(failure_mode=["clean_bounce", "hard_bounce"], + bounce_target=["brokers", "clients"]) + def test_transactions(self, failure_mode, bounce_target): + security_protocol = 'PLAINTEXT' + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = security_protocol + self.kafka.start() + input_messages = self.seed_messages() + self.copy_messages_transactionally(failure_mode, bounce_target) + output_messages = self.get_messages_from_output_topic() + output_message_set = set(output_messages) + input_message_set = set(input_messages) + num_dups = abs(len(output_messages) - len(output_message_set)) + assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups + assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\ + (len(input_message_set), len(output_message_set)) http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/tests/kafkatest/version.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 8e1497c..f63a7c1 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -88,3 +88,8 @@ V_0_10_2_1 = KafkaVersion("0.10.2.1") LATEST_0_10_2 = V_0_10_2_1 LATEST_0_10 = LATEST_0_10_2 + +# 0.11.0.0 versions +V_0_11_0_0 = KafkaVersion("0.11.0.0") +LATEST_0_11_0 = V_0_11_0_0 +LATEST_0_11 = LATEST_0_11_0 http://git-wip-us.apache.org/repos/asf/kafka/blob/dc15062e/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java new file mode 100644 index 0000000..c79c854 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +/** + * This class is primarily meant for use with system tests. It copies messages from an input partition to an output + * topic transactionally, committing the offsets and messages together. + */ +public class TransactionalMessageCopier { + /** Get the command-line argument parser. */ + private static ArgumentParser argParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("transactional-message-copier") + .defaultHelp(true) + .description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages"); + + parser.addArgument("--input-topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("INPUT-TOPIC") + .dest("inputTopic") + .help("Consume messages from this topic"); + + parser.addArgument("--input-partition") + .action(store()) + .required(true) + .type(Integer.class) + .metavar("INPUT-PARTITION") + .dest("inputPartition") + .help("Consume messages from this partition of the input topic."); + + + parser.addArgument("--output-topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("OUTPUT-TOPIC") + .dest("outputTopic") + .help("Produce messages to this topic"); + + parser.addArgument("--broker-list") + .action(store()) + .required(true) + .type(String.class) + .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") + .dest("brokerList") + .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); + + parser.addArgument("--max-messages") + .action(store()) + .required(false) + .setDefault(-1) + .type(Integer.class) + .metavar("MAX-MESSAGES") + .dest("maxMessages") + .help("Process these many messages upto the end offset at the time this program was launched. If set to -1 " + + "we will just read to the end offset of the input partition (as of the time the program was launched)."); + + parser.addArgument("--consumer-group") + .action(store()) + .required(false) + .setDefault(-1) + .type(String.class) + .metavar("CONSUMER-GROUP") + .dest("consumerGroup") + .help("The consumer group id to use for storing the consumer offsets."); + + parser.addArgument("--transaction-size") + .action(store()) + .required(false) + .setDefault(200) + .type(Integer.class) + .metavar("TRANSACTION-SIZE") + .dest("messagesPerTransaction") + .help("The number of messages to put in each transaction. Default is 200."); + + + parser.addArgument("--transactional-id") + .action(store()) + .required(true) + .type(String.class) + .metavar("TRANSACTIONAL-ID") + .dest("transactionalId") + .help("The transactionalId to assign to the producer"); + + + return parser; + } + + private static KafkaProducer<String, String> createProducer(Namespace parsedArgs) { + String transactionalId = parsedArgs.getString("transactionalId"); + String brokerList = parsedArgs.getString("brokerList"); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer<>(props); + } + + private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs, TopicPartition inputPartition) { + String consumerGroup = parsedArgs.getString("consumerGroup"); + String brokerList = parsedArgs.getString("brokerList"); + Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction"); + + Properties props = new Properties(); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); + + return consumer; + + } + + private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String topic, ConsumerRecord<String, String> record) { + return new ProducerRecord<>(topic, record.key(), record.value()); + } + + private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> consumer) { + Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + return positions; + } + + private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) { + long currentPosition = consumer.position(partition); + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Arrays.asList(partition)); + if (endOffsets.containsKey(partition)) { + return endOffsets.get(partition) - currentPosition; + } + return 0; + } + + private static String toJsonString(Map<String, Object> data) { + String json; + try { + ObjectMapper mapper = new ObjectMapper(); + json = mapper.writeValueAsString(data); + } catch (JsonProcessingException e) { + json = "Bad data can't be written as json: " + e.getMessage(); + } + return json; + } + + private static String statusAsJson(long consumed, long remaining, String transactionalId) { + Map<String, Object> statusData = new HashMap<>(); + statusData.put("progress", transactionalId); + statusData.put("consumed", consumed); + statusData.put("remaining", remaining); + return toJsonString(statusData); + } + + private static String shutDownString(long consumed, long remaining, String transactionalId) { + Map<String, Object> shutdownData = new HashMap<>(); + shutdownData.put("remaining", remaining); + shutdownData.put("consumed", consumed); + shutdownData.put("shutdown_complete", transactionalId); + return toJsonString(shutdownData); + } + + public static void main(String[] args) throws IOException { + Namespace parsedArgs = argParser().parseArgsOrFail(args); + Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction"); + final String transactionalId = parsedArgs.getString("transactionalId"); + final String outputTopic = parsedArgs.getString("outputTopic"); + + String consumerGroup = parsedArgs.getString("consumerGroup"); + TopicPartition inputPartition = new TopicPartition(parsedArgs.getString("inputTopic"), parsedArgs.getInt("inputPartition")); + + final KafkaProducer<String, String> producer = createProducer(parsedArgs); + final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs, inputPartition); + + consumer.assign(Arrays.asList(inputPartition)); + + long maxMessages = parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages"); + maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages); + + producer.initTransactions(); + + + final AtomicBoolean isShuttingDown = new AtomicBoolean(false); + final AtomicLong remainingMessages = new AtomicLong(maxMessages); + final AtomicLong numMessagesProcessed = new AtomicLong(0); + int exitCode = 0; + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + isShuttingDown.set(true); + // Flush any remaining messages + producer.close(); + synchronized (consumer) { + consumer.close(); + } + System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); + } + }); + + try { + while (0 < remainingMessages.get()) { + if ((((double) numMessagesProcessed.get() / maxMessages) * 100) % 10 == 0) { + // print status for every 10% we progress. + System.out.println(statusAsJson(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); + } + if (isShuttingDown.get()) + break; + int messagesInCurrentTransaction = 0; + long numMessagesForNextTransaction = Math.min(numMessagesPerTransaction, remainingMessages.get()); + producer.beginTransaction(); + + while (messagesInCurrentTransaction < numMessagesForNextTransaction) { + ConsumerRecords<String, String> records = consumer.poll(200L); + for (ConsumerRecord<String, String> record : records) { + producer.send(producerRecordFromConsumerRecord(outputTopic, record)); + messagesInCurrentTransaction++; + } + } + producer.sendOffsetsToTransaction(consumerPositions(consumer), consumerGroup); + producer.commitTransaction(); + remainingMessages.set(maxMessages - numMessagesProcessed.addAndGet(messagesInCurrentTransaction)); + } + } finally { + producer.close(); + synchronized (consumer) { + consumer.close(); + } + } + System.exit(exitCode); + } +}
