Repository: kafka Updated Branches: refs/heads/0.11.0 c0e789bdc -> 6d04e412c
KAFKA-4923: Modify compatibility test for Exaclty-Once Semantics in Streams - add broker compatibility system tests Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Eno Thereska, Guozhang Wang Closes #2974 from mjsax/kafka-4923-add-eos-to-streams-add-broker-check-and-system-test (cherry picked from commit 495836a4a27c2219f72a5f3776ba8e5216493b41) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d04e412 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d04e412 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d04e412 Branch: refs/heads/0.11.0 Commit: 6d04e412cffabb1976aa935b4fb4767fd255836e Parents: c0e789b Author: Matthias J. Sax <[email protected]> Authored: Sun May 21 22:16:18 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sun May 21 22:16:55 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle.xml | 2 +- .../org/apache/kafka/streams/KafkaStreams.java | 4 +- .../processor/internals/StreamsKafkaClient.java | 19 ++++++- .../streams/tests/BrokerCompatibilityTest.java | 21 +++++-- tests/kafkatest/services/streams.py | 4 +- .../streams_broker_compatibility_test.py | 59 +++++++++++++------- tests/kafkatest/version.py | 7 ++- vagrant/base.sh | 2 + 8 files changed, 86 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 9f9e9ae..5065972 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -109,7 +109,7 @@ </module> <module name="BooleanExpressionComplexity"> <!-- default is 3 --> - <property name="max" value="4"/> + <property name="max" value="5"/> </module> <module name="ClassFanOutComplexity"> http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 74f5fc1..3b801a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -73,6 +73,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG; /** * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and @@ -409,7 +411,7 @@ public class KafkaStreams { private void checkBrokerVersionCompatibility() throws StreamsException { final StreamsKafkaClient client = new StreamsKafkaClient(config); - client.checkBrokerCompatibility(); + client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG))); try { client.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index c2b76f9..44f7900 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -61,6 +61,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG; + public class StreamsKafkaClient { private static final ConfigDef CONFIG = StreamsConfig.configDef() @@ -310,7 +313,7 @@ public class StreamsKafkaClient { * * @throws StreamsException if brokers have version 0.10.0.x */ - public void checkBrokerCompatibility() throws StreamsException { + public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException { final ClientRequest clientRequest = kafkaClient.newClientRequest( getAnyReadyBrokerId(), new ApiVersionsRequest.Builder(), @@ -331,5 +334,19 @@ public class StreamsKafkaClient { if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) { throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher."); } + + if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) { + throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher."); + } } + + private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) { + return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null + && apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null + && apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null + && apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null + && apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null + && apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index c04b3d1..0af2594 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -33,6 +34,7 @@ import org.apache.kafka.test.TestUtils; import java.io.File; import java.util.Collections; +import java.util.Locale; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -41,11 +43,12 @@ public class BrokerCompatibilityTest { private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic"; private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic"; - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { System.out.println("StreamsTest instance started"); final String kafka = args.length > 0 ? args[0] : "localhost:9092"; final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath(); + final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false; final File stateDir = new File(stateDirStr); stateDir.mkdir(); @@ -58,6 +61,9 @@ public class BrokerCompatibilityTest { streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + if (eosEnabled) { + streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + } final int timeout = 6000; streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout); streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout); @@ -70,7 +76,7 @@ public class BrokerCompatibilityTest { final KafkaStreams streams = new KafkaStreams(builder, streamsProperties); streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override - public void uncaughtException(Thread t, Throwable e) { + public void uncaughtException(final Thread t, final Throwable e) { System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e); streams.close(30, TimeUnit.SECONDS); @@ -91,27 +97,30 @@ public class BrokerCompatibilityTest { System.out.println("wait for result"); - loopUntilRecordReceived(kafka); + loopUntilRecordReceived(kafka, eosEnabled); System.out.println("close Kafka Streams"); streams.close(); } - private static void loopUntilRecordReceived(final String kafka) { + private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer"); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (eosEnabled) { + consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)); + } final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties); consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { - ConsumerRecords<String, String> records = consumer.poll(100); - for (ConsumerRecord<String, String> record : records) { + final ConsumerRecords<String, String> records = consumer.poll(100); + for (final ConsumerRecord<String, String> record : records) { if (record.key().equals("key") && record.value().equals("value")) { consumer.close(); return; http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index e7be947..905320a 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -166,8 +166,8 @@ class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): class StreamsBrokerCompatibilityService(StreamsTestBaseService): - def __init__(self, test_context, kafka): + def __init__(self, test_context, kafka, eosEnabled): super(StreamsBrokerCompatibilityService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.BrokerCompatibilityTest", - "dummy") + eosEnabled) http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index a5bdbc6..c4b554e 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -21,14 +21,16 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsBrokerCompatibilityService from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.version import DEV_BRANCH, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion class StreamsBrokerCompatibility(Test): """ - These tests validate that Streams v0.10.2+ can connect to older brokers v0.10.1+ - and that Streams fails fast for 0.10.0 brokers - and that Streams times-out for pre-0.10.0 brokers + These tests validates that + - Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1 + - Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1 + - Streams fails fast for 0.10.0 brokers + - Streams times-out for pre-0.10.0 brokers """ input = "brokerCompatibilitySourceTopic" @@ -36,7 +38,6 @@ class StreamsBrokerCompatibility(Test): def __init__(self, test_context): super(StreamsBrokerCompatibility, self).__init__(test_context=test_context) - self.zk = ZookeeperService(test_context, num_nodes=1) self.kafka = KafkaService(test_context, num_nodes=1, @@ -45,9 +46,6 @@ class StreamsBrokerCompatibility(Test): self.input: {'partitions': 1, 'replication-factor': 1}, self.output: {'partitions': 1, 'replication-factor': 1} }) - - self.processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka) - self.consumer = VerifiableConsumer(test_context, 1, self.kafka, @@ -57,16 +55,35 @@ class StreamsBrokerCompatibility(Test): def setUp(self): self.zk.start() - @parametrize(broker_version=str(DEV_BRANCH)) + @parametrize(broker_version=str(LATEST_0_10_2)) @parametrize(broker_version=str(LATEST_0_10_1)) - def test_compatible_brokers(self, broker_version): + def test_fail_fast_on_incompatible_brokers_if_eos_enabled(self, broker_version): self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() - self.processor.start() + processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True) + processor.start() + + processor.node.account.ssh(processor.start_cmd(processor.node)) + with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: + monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.', + timeout_sec=60, + err_msg="Never saw 'EOS requires broker version 0.11+' error message " + str(processor.node.account)) + + self.kafka.stop() + + @parametrize(broker_version=str(LATEST_0_10_2)) + @parametrize(broker_version=str(LATEST_0_10_1)) + def test_compatible_brokers_eos_disabled(self, broker_version): + self.kafka.set_version(KafkaVersion(broker_version)) + self.kafka.start() + + processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False) + processor.start() + self.consumer.start() - self.processor.wait() + processor.wait() wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.") @@ -78,13 +95,14 @@ class StreamsBrokerCompatibility(Test): self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() - self.processor.start() + processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False) + processor.start() - self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node)) - with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor: + processor.node.account.ssh(processor.start_cmd(processor.node)) + with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Kafka Streams requires broker version 0.10.1.x or higher.', timeout_sec=60, - err_msg="Never saw 'incompatible broker' error message " + str(self.processor.node.account)) + err_msg="Never saw 'Streams requires broker verion 0.10.1+' error message " + str(processor.node.account)) self.kafka.stop() @@ -94,12 +112,13 @@ class StreamsBrokerCompatibility(Test): self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() - self.processor.start() + processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False) + processor.start() - self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node)) - with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor: + processor.node.account.ssh(processor.start_cmd(processor.node)) + with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.', timeout_sec=60, - err_msg="Never saw 'no available broker' error message " + str(self.processor.node.account)) + err_msg="Never saw 'no available brokers' error message " + str(processor.node.account)) self.kafka.stop() http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/tests/kafkatest/version.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 7cd489d..8e1497c 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -82,4 +82,9 @@ V_0_10_1_0 = KafkaVersion("0.10.1.0") V_0_10_1_1 = KafkaVersion("0.10.1.1") LATEST_0_10_1 = V_0_10_1_1 -LATEST_0_10 = LATEST_0_10_1 +# 0.10.2.x versions +V_0_10_2_0 = KafkaVersion("0.10.2.0") +V_0_10_2_1 = KafkaVersion("0.10.2.1") +LATEST_0_10_2 = V_0_10_2_1 + +LATEST_0_10 = LATEST_0_10_2 http://git-wip-us.apache.org/repos/asf/kafka/blob/6d04e412/vagrant/base.sh ---------------------------------------------------------------------- diff --git a/vagrant/base.sh b/vagrant/base.sh index 28dcf69..100891b 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -91,6 +91,8 @@ get_kafka 0.10.0.1 2.11 chmod a+rw /opt/kafka-0.10.0.1 get_kafka 0.10.1.1 2.11 chmod a+rw /opt/kafka-0.10.1.1 +get_kafka 0.10.2.1 +chmod a+rw /opt/kafka-0.10.2.1 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
