[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414777#comment-16414777 ]
ASF GitHub Bot commented on KAFKA-6054: --------------------------------------- mjsax closed pull request #4758: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4758 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index af10f61b5c4..a25868125e9 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then - CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do + if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" + fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do + if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" + fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index 20a184c437c..5e97f901cb6 100644 --- a/build.gradle +++ b/build.gradle @@ -770,6 +770,30 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { + testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { + dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { + testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 6094b547bb7..b80dfccf3d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -17,7 +17,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -26,10 +28,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams.html b/docs/streams.html index fe0e84ee3b7..d691e63a432 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -807,21 +807,50 @@ <h2><a id="streams_upgrade_and_api" href="#streams_upgrade_and_api">Upgrade Guid See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features. </p> + <p> + Upgrading from 0.10.0.x to 0.10.2.x directly is also possible. + See <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a> and <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a> + for a complete list of API changes. + Upgrading to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase + (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). + As an alternative, and offline upgrade is also possible. + </p> + <ul> + <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from=</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li> + <li> bounce each instance of your application once </li> + <li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li> + <li> bounce each instance of your application once more to complete the upgrade </li> + </ul> + <p> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported) </p> + <ul> + <li> stop all old (0.10.0.x) application instances </li> + <li> update your code and swap old code and jar file with new code and new jar file </li> + <li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li> + </ul> + <p> If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams">Upgrade Section for 0.10.1</a>. It highlights incompatible changes you need to consider to upgrade your code and application. See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features. </p> - <h3><a id="streams_api_changes_01021" href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3> - <p> + <h3><a id="streams_api_changes_01022" href="#streams_api_changes_0102">Notable changes in 0.10.2.2</a></h3> + <p> + Parameter updates in <code>StreamsConfig</code>: + </p> + <ul> + <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li> + </ul> + + <h3><a id="streams_api_changes_01021" href="#streams_api_changes_0102">Notable changes in 0.10.2.1</a></h3> + <p> Parameter updates in <code>StreamsConfig</code>: </p> - <ul> + <ul> <li> of particular importance to improve the resiliency of a Kafka Streams application are two changes to default parameters of producer <code>retries</code> and consumer <code>max.poll.interval.ms</code> </li> - </ul> - <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3> + </ul> + <h3><a id="streams_api_changes_0102" href="#streams_api_changes_0102">Streams API changes in 0.10.2.0</a></h3> <p> New methods in <code>KafkaStreams</code>: </p> diff --git a/docs/upgrade.html b/docs/upgrade.html index d7581fa8dac..77477628f4d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -61,6 +61,11 @@ <h5><a id="upgrade_1020_streams" href="#upgrade_1020_streams">Upgrading a 0.10.1 <li> See <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li> </ul> +<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5> +<ul> + <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li> +</ul> + <h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5> <ul> <li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code> default value was changed from 300000 to <code>Integer.MAX_VALUE</code>. @@ -141,6 +146,23 @@ <h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Upgrading a 0.10.0 <li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li> <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details). Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li> + <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase + (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>). + As an alternative, and offline upgrade is also possible. + <ul> + <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li> + <li> bounce each instance of your application once </li> + <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li> + <li> bounce each instance of your application once more to complete the upgrade </li> + </ul> + </li> + <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported) + <ul> + <li> stop all old (0.10.0.x) application instances </li> + <li> update your code and swap old code and jar file with new code and new jar file </li> + <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li> + </ul> + </li> </ul> <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5> diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 25faa90a78f..4084b12216d 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -31,6 +31,8 @@ versions += [ jackson: "2.8.5", jetty: "9.2.22.v20170606", jersey: "2.24", + kafka_0100: "0.10.0.1", + kafka_0101: "0.10.1.1", log4j: "1.2.17", jopt: "5.0.3", junit: "4.12", @@ -92,6 +94,8 @@ libs += [ junit: "junit:junit:$versions.junit", log4j: "log4j:log4j:$versions.log4j", joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt", + kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", + kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", lz4: "net.jpountz.lz4:lz4:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", powermock: "org.powermock:powermock-module-junit4:$versions.powermock", diff --git a/settings.gradle b/settings.gradle index 29d38950a8a..576b40b9ce1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender', +include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100', + 'streams:upgrade-system-tests-0101', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0724571a0bc..3baa0785376 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -95,6 +95,16 @@ */ public static final String PRODUCER_PREFIX = "producer."; + /** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}. + */ + public static final String UPGRADE_FROM_0100 = "0.10.0"; + + /** {@code upgrade.from} */ + public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; + public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " + + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x)."; + /** {@code state.dir} */ public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; @@ -383,7 +393,13 @@ 40 * 1000, atLeast(0), ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC); + REQUEST_TIMEOUT_MS_DOC) + .define(UPGRADE_FROM_CONFIG, + ConfigDef.Type.STRING, + null, + in(null, UPGRADE_FROM_0100), + ConfigDef.Importance.LOW, + UPGRADE_FROM_DOC); } // this is the list of configs for underlying clients @@ -501,6 +517,7 @@ public StreamsConfig(final Map<?, ?> props) { consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); // add configs required for stream partition assignor + consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG)); consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread); consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index a50a81914cf..889d2ff6a32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; @@ -155,6 +154,8 @@ public int compare(TopicPartition p1, TopicPartition p2) { private String userEndPoint; private int numStandbyReplicas; + private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION; + private Cluster metadataWithInternalTopics; private Map<HostInfo, Set<TopicPartition>> partitionsByHostState; @@ -182,6 +183,12 @@ void time(final Time time) { public void configure(Map<String, ?> configs) { numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); + if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) { + log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x."); + userMetadataVersion = 1; + } + Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); if (o == null) { KafkaException ex = new KafkaException("StreamThread is not specified"); @@ -241,7 +248,7 @@ public Subscription subscription(Set<String> topics) { Set<TaskId> prevTasks = streamThread.prevTasks(); Set<TaskId> standbyTasks = streamThread.cachedTasks(); standbyTasks.removeAll(prevTasks); - SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPoint); + SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, prevTasks, standbyTasks, this.userEndPoint); if (streamThread.builder.sourceTopicPattern() != null) { SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); @@ -279,11 +286,16 @@ public Subscription subscription(Set<String> topics) { // construct the client metadata from the decoded subscription info Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>(); + int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION; for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) { String consumerId = entry.getKey(); Subscription subscription = entry.getValue(); SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()); + final int usedVersion = info.version; + if (usedVersion < minUserMetadataVersion) { + minUserMetadataVersion = usedVersion; + } // create the new client metadata if necessary ClientMetadata clientMetadata = clientsMetadata.get(info.processId); @@ -539,7 +551,7 @@ public Subscription subscription(Set<String> topics) { } // finally, encode the assignment before sending back to coordinator - assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode())); + assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode())); i++; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index ddbd67d8d88..7a6bf14d918 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.common.record.ByteBufferInputStream; @@ -56,7 +55,7 @@ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> this(CURRENT_VERSION, activeTasks, standbyTasks, hostState); } - protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, + public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks, Map<HostInfo, Set<TopicPartition>> hostState) { this.version = version; this.activeTasks = activeTasks; @@ -155,9 +154,7 @@ public static AssignmentInfo decode(ByteBuffer data) { } } - return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions); - - + return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions); } catch (IOException ex) { throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index c3481c05156..92c50a2a942 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.errors.TaskAssignmentException; @@ -32,7 +31,7 @@ private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - private static final int CURRENT_VERSION = 2; + public static final int CURRENT_VERSION = 2; public final int version; public final UUID processId; @@ -44,7 +43,7 @@ public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> stand this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint); } - private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) { + public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) { this.version = version; this.processId = processId; this.prevTasks = prevTasks; diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 50ab1175cef..832883a0268 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -96,7 +96,7 @@ public boolean conditionMet() { } @Test - public void testStateCloseAfterCreate() throws Exception { + public void testStateCloseAfterCreate() { final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -160,7 +160,7 @@ public void testStateThreadClose() throws Exception { // make sure we have the global state thread running too builder.globalTable("anyTopic", "anyStore"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads); - final KafkaStreams streams = new KafkaStreams(builder, props); + new KafkaStreams(builder, props); testStateThreadCloseHelper(numThreads); } @@ -200,9 +200,8 @@ public boolean conditionMet() { } - @Test - public void testInitializesAndDestroysMetricsReporters() throws Exception { + public void testInitializesAndDestroysMetricsReporters() { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -217,7 +216,7 @@ public void testInitializesAndDestroysMetricsReporters() throws Exception { } @Test - public void testCloseIsIdempotent() throws Exception { + public void testCloseIsIdempotent() { streams.close(); final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); @@ -227,7 +226,7 @@ public void testCloseIsIdempotent() throws Exception { } @Test(expected = IllegalStateException.class) - public void testCannotStartOnceClosed() throws Exception { + public void testCannotStartOnceClosed() { streams.start(); streams.close(); try { @@ -241,7 +240,7 @@ public void testCannotStartOnceClosed() throws Exception { } @Test(expected = IllegalStateException.class) - public void testCannotStartTwice() throws Exception { + public void testCannotStartTwice() { streams.start(); try { @@ -267,10 +266,10 @@ public void testIllegalMetricsConfig() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - + new KafkaStreams(builder, props); } @Test @@ -278,6 +277,7 @@ public void testLegalMetricsConfig() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); final KStreamBuilder builder1 = new KStreamBuilder(); final KafkaStreams streams1 = new KafkaStreams(builder1, props); @@ -285,27 +285,26 @@ public void testLegalMetricsConfig() { props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString()); final KStreamBuilder builder2 = new KStreamBuilder(); - final KafkaStreams streams2 = new KafkaStreams(builder2, props); - + new KafkaStreams(builder2, props); } @Test(expected = IllegalStateException.class) - public void shouldNotGetAllTasksWhenNotRunning() throws Exception { + public void shouldNotGetAllTasksWhenNotRunning() { streams.allMetadata(); } @Test(expected = IllegalStateException.class) - public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception { + public void shouldNotGetAllTasksWithStoreWhenNotRunning() { streams.allMetadataForStore("store"); } @Test(expected = IllegalStateException.class) - public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception { + public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() { streams.metadataForKey("store", "key", Serdes.String().serializer()); } @Test(expected = IllegalStateException.class) - public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception { + public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() { streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() { @Override public Integer partition(final String key, final Object value, final int numPartitions) { @@ -321,6 +320,7 @@ public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Excepti final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final KStreamBuilder builder = new KStreamBuilder(); @@ -366,16 +366,18 @@ private KafkaStreams createKafkaStreams() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); final KStreamBuilder builder = new KStreamBuilder(); return new KafkaStreams(builder, props); } @Test - public void testCleanup() throws Exception { + public void testCleanup() { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -391,6 +393,7 @@ public void testCannotCleanupWhileRunning() throws Exception { final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); @@ -448,6 +451,5 @@ public void onChange(final KafkaStreams.State newState, final KafkaStreams.State streams.close(); } } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 15cc1af8e5d..ab8701fa966 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -60,7 +60,7 @@ public void setUp() { } @Test - public void testGetProducerConfigs() throws Exception { + public void testGetProducerConfigs() { Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client"); assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer"); assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100"); @@ -68,7 +68,7 @@ public void testGetProducerConfigs() throws Exception { } @Test - public void testGetConsumerConfigs() throws Exception { + public void testGetConsumerConfigs() { Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client"); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application"); @@ -77,7 +77,7 @@ public void testGetConsumerConfigs() throws Exception { } @Test - public void testGetRestoreConsumerConfigs() throws Exception { + public void testGetRestoreConsumerConfigs() { Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client"); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); @@ -86,7 +86,7 @@ public void testGetRestoreConsumerConfigs() throws Exception { @Test public void defaultSerdeShouldBeConfigured() { - Map<String, Object> serializerConfigs = new HashMap<String, Object>(); + Map<String, Object> serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); Serializer<String> serializer = Serdes.String().serializer(); @@ -117,7 +117,7 @@ public void shouldSupportMultipleBootstrapServers() { } @Test - public void shouldSupportPrefixedConsumerConfigs() throws Exception { + public void shouldSupportPrefixedConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -127,7 +127,7 @@ public void shouldSupportPrefixedConsumerConfigs() throws Exception { } @Test - public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception { + public void shouldSupportPrefixedRestoreConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -137,7 +137,7 @@ public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception { } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); @@ -145,7 +145,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() thro } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); @@ -153,7 +153,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig } @Test - public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception { + public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(producerPrefix("interceptor.statsd.host"), "host"); final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); @@ -162,7 +162,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() thro @Test - public void shouldSupportPrefixedProducerConfigs() throws Exception { + public void shouldSupportPrefixedProducerConfigs() { props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -172,7 +172,7 @@ public void shouldSupportPrefixedProducerConfigs() throws Exception { } @Test - public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception { + public void shouldBeSupportNonPrefixedConsumerConfigs() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -182,7 +182,7 @@ public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception { } @Test - public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception { + public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -192,7 +192,7 @@ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception } @Test - public void shouldSupportNonPrefixedProducerConfigs() throws Exception { + public void shouldSupportNonPrefixedProducerConfigs() { props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -201,24 +201,22 @@ public void shouldSupportNonPrefixedProducerConfigs() throws Exception { assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } - - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception { + public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() { props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.keySerde(); } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception { + public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() { props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.valueSerde(); } @Test - public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception { + public void shouldOverrideStreamsDefaultConsumerConfigs() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -228,7 +226,7 @@ public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception { } @Test - public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception { + public void shouldOverrideStreamsDefaultProducerConfigs() { props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000"); final StreamsConfig streamsConfig = new StreamsConfig(props); final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client"); @@ -236,7 +234,7 @@ public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception { } @Test - public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception { + public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); @@ -246,14 +244,14 @@ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throw } @Test(expected = ConfigException.class) - public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception { + public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.getConsumerConfigs(null, "a", "b"); } @Test(expected = ConfigException.class) - public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception { + public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); streamsConfig.getRestoreConsumerConfigs("client"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 1d2a3e217e1..12568246e5d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.TestUtils; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -112,6 +113,7 @@ public void shouldFanoutTheInput() throws Exception { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 6a8c7ff10e1..3e0d80a6b19 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -10,6 +10,7 @@ */ package org.apache.kafka.streams.integration; +import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -44,8 +45,6 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; -import kafka.utils.MockTime; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -127,7 +126,7 @@ public void shouldReduce() throws Exception { List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), new StringDeserializer(), - 5); + 5); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override @@ -177,7 +176,7 @@ public String apply(Windowed<String> windowedKey, String value) { List<KeyValue<String, String>> windowedOutput = receiveMessages( new StringDeserializer(), new StringDeserializer(), - 10); + 10); Comparator<KeyValue<String, String>> comparator = @@ -229,7 +228,7 @@ public String apply(final Windowed<Integer> windowedKey, final Long value) { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), - 5); + 5); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { @@ -303,6 +302,4 @@ private void startStreams() { } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index bd5911d270b..13124f1cc5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -155,7 +155,7 @@ public void shouldReduce() throws Exception { final List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), new StringDeserializer(), - 10); + 10); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override @@ -209,7 +209,7 @@ public String apply(final Windowed<String> windowedKey, final String value) { final List<KeyValue<String, String>> windowedOutput = receiveMessages( new StringDeserializer(), new StringDeserializer(), - 15); + 15); final Comparator<KeyValue<String, String>> comparator = @@ -263,7 +263,7 @@ public void shouldAggregate() throws Exception { final List<KeyValue<String, Integer>> results = receiveMessages( new StringDeserializer(), new IntegerDeserializer(), - 10); + 10); Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { @Override @@ -313,7 +313,7 @@ public String apply(final Windowed<String> windowedKey, final Integer value) { final List<KeyValue<String, Integer>> windowedMessages = receiveMessages( new StringDeserializer(), new IntegerDeserializer(), - 15); + 15); final Comparator<KeyValue<String, Integer>> comparator = @@ -364,7 +364,7 @@ public void shouldCount() throws Exception { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), - 10); + 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { @@ -406,7 +406,7 @@ public String apply(final Windowed<Integer> windowedKey, final Long value) { final List<KeyValue<String, Long>> results = receiveMessages( new StringDeserializer(), new LongDeserializer(), - 10); + 10); Collections.sort(results, new Comparator<KeyValue<String, Long>>() { @Override public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 64e8459b274..d09d505b5e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -129,7 +129,7 @@ public void createTopics() throws InterruptedException { } @Before - public void before() throws IOException, InterruptedException { + public void before() throws Exception { testNo++; createTopics(); streamsConfiguration = new Properties(); @@ -609,15 +609,13 @@ private void verifyCanGetByKey(final String[] keys, * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, * the method merely inserts the new found key into the list of * expected keys. - * @throws InterruptedException */ private void verifyGreaterOrEqual(final String[] keys, final Map<String, Long> expectedWindowedCount, final Map<String, Long> expectedCount, final ReadOnlyWindowStore<String, Long> windowStore, final ReadOnlyKeyValueStore<String, Long> keyValueStore, - final boolean failIfKeyNotFound) - throws InterruptedException { + final boolean failIfKeyNotFound) { final Map<String, Long> windowState = new HashMap<>(); final Map<String, Long> countState = new HashMap<>(); @@ -744,5 +742,4 @@ public void run() { } } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 6503038e5b3..e06ed739048 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Test; @@ -111,6 +112,7 @@ private Properties configProps() { { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); + setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } @@ -119,7 +121,7 @@ private Properties configProps() { @SuppressWarnings("unchecked") @Test - public void testSubscription() throws Exception { + public void testSubscription() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -159,7 +161,7 @@ public void testSubscription() throws Exception { } @Test - public void testAssignBasic() throws Exception { + public void testAssignBasic() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -227,7 +229,7 @@ public void testAssignBasic() throws Exception { } @Test - public void testAssignWithPartialTopology() throws Exception { + public void testAssignWithPartialTopology() { Properties props = configProps(); props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class); StreamsConfig config = new StreamsConfig(props); @@ -267,7 +269,7 @@ public void testAssignWithPartialTopology() throws Exception { @Test - public void testAssignEmptyMetadata() throws Exception { + public void testAssignEmptyMetadata() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); @@ -324,7 +326,7 @@ public void testAssignEmptyMetadata() throws Exception { } @Test - public void testAssignWithNewTasks() throws Exception { + public void testAssignWithNewTasks() { builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); @@ -381,7 +383,7 @@ public void testAssignWithNewTasks() throws Exception { } @Test - public void testAssignWithStates() throws Exception { + public void testAssignWithStates() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); @@ -470,7 +472,7 @@ public void testAssignWithStates() throws Exception { } @Test - public void testAssignWithStandbyReplicas() throws Exception { + public void testAssignWithStandbyReplicas() { Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); StreamsConfig config = new StreamsConfig(props); @@ -543,7 +545,7 @@ public void testAssignWithStandbyReplicas() throws Exception { } @Test - public void testOnAssignment() throws Exception { + public void testOnAssignment() { TopicPartition t2p3 = new TopicPartition("topic2", 3); TopologyBuilder builder = new TopologyBuilder(); @@ -576,7 +578,7 @@ public void testOnAssignment() throws Exception { } @Test - public void testAssignWithInternalTopics() throws Exception { + public void testAssignWithInternalTopics() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); @@ -612,7 +614,7 @@ public void testAssignWithInternalTopics() throws Exception { } @Test - public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception { + public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); @@ -650,7 +652,7 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throw } @Test - public void shouldAddUserDefinedEndPointToSubscription() throws Exception { + public void shouldAddUserDefinedEndPointToSubscription() { final Properties properties = configProps(); properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080"); final StreamsConfig config = new StreamsConfig(properties); @@ -663,8 +665,8 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception { final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, + uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); @@ -673,7 +675,80 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception { } @Test - public void shouldMapUserEndPointToTopicPartitions() throws Exception { + public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() { + final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + final Set<TaskId> emptyTasks = Collections.emptySet(); + subscriptions.put( + "consumer1", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + subscriptions.put( + "consumer2", + new PartitionAssignor.Subscription( + Collections.singletonList("topic1"), + new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode() + ) + ); + + final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + StreamsConfig config = new StreamsConfig(configProps()); + + final TopologyBuilder builder = new TopologyBuilder(); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + "appId", + "clientId", + UUID.randomUUID(), + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId")); + final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); + + assertEquals(2, assignment.size()); + assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version); + assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version); + } + + @Test + public void shouldDownGradeSubscription() { + final Properties properties = configProps(); + properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100); + StreamsConfig config = new StreamsConfig(properties); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + + String clientId = "client-id"; + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + "appId", + "clientId", + UUID.randomUUID(), + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", clientId)); + + PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1")); + + assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version); + } + + @Test + public void shouldMapUserEndPointToTopicPartitions() { final Properties properties = configProps(); final String myEndPoint = "localhost:8080"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -711,7 +786,7 @@ public void shouldMapUserEndPointToTopicPartitions() throws Exception { } @Test - public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception { + public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() { final Properties properties = configProps(); final String myEndPoint = "localhost"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -736,7 +811,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() thr } @Test - public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception { + public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { final Properties properties = configProps(); final String myEndPoint = "localhost:j87yhk"; properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint); @@ -760,7 +835,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() th } @Test - public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception { + public void shouldExposeHostStateToTopicPartitionsOnAssignment() { List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -773,7 +848,7 @@ public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exceptio } @Test - public void shouldSetClusterMetadataOnAssignment() throws Exception { + public void shouldSetClusterMetadataOnAssignment() { final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0)); final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), @@ -793,7 +868,7 @@ public void shouldSetClusterMetadataOnAssignment() throws Exception { } @Test - public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { + public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() { final Cluster cluster = partitionAssignor.clusterMetadata(); assertNotNull(cluster); } @@ -891,11 +966,11 @@ public Object apply(Object value1, Object value2) { new TopicPartition(applicationId + "-count-repartition", 1), new TopicPartition(applicationId + "-count-repartition", 2) ); - assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment))); + assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); } @Test - public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception { + public void shouldUpdatePartitionHostInfoMapOnAssignment() { final TopicPartition partitionOne = new TopicPartition("topic", 1); final TopicPartition partitionTwo = new TopicPartition("topic", 2); final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap( @@ -912,7 +987,7 @@ public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception { } @Test - public void shouldUpdateClusterMetadataOnAssignment() throws Exception { + public void shouldUpdateClusterMetadataOnAssignment() { final TopicPartition topicOne = new TopicPartition("topic", 1); final TopicPartition topicTwo = new TopicPartition("topic2", 2); final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap( @@ -928,7 +1003,7 @@ public void shouldUpdateClusterMetadataOnAssignment() throws Exception { } @Test - public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception { + public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() { final Properties props = configProps(); props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); final StreamsConfig config = new StreamsConfig(props); @@ -976,12 +1051,12 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Except } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() { partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() { final Map<String, Object> config = new HashMap<>(); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index cfa0e61dc85..52c753d5201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -65,10 +65,9 @@ public void shouldDecodePreviousVersion() throws Exception { assertEquals(oldVersion.activeTasks, decoded.activeTasks); assertEquals(oldVersion.standbyTasks, decoded.standbyTasks); assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1 - assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; + assertEquals(1, decoded.version); } - /** * This is a clone of what the V1 encoding did. The encode method has changed for V2 * so it is impossible to test compatibility without having this diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index d1921261bf7..9f59b11cc14 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -115,7 +115,7 @@ public boolean test(String key, Integer value) { } }); - data.process(SmokeTestUtil.printProcessorSupplier("data")); + data.process(SmokeTestUtil.<String, Integer>printProcessorSupplier("data")); // min KGroupedStream<String, Integer> @@ -141,7 +141,7 @@ public Integer apply(String aggKey, Integer value, Integer aggregate) { ).to(stringSerde, intSerde, "min"); KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName"); - minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); + minTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("min")); // max groupedData.aggregate( @@ -163,7 +163,7 @@ public Integer apply(String aggKey, Integer value, Integer aggregate) { ).to(stringSerde, intSerde, "max"); KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName"); - maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); + maxTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("max")); // sum groupedData.aggregate( @@ -186,7 +186,7 @@ public Long apply(String aggKey, Integer value, Long aggregate) { KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName"); - sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); + sumTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("sum")); // cnt groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt") @@ -195,7 +195,7 @@ public Long apply(String aggKey, Integer value, Long aggregate) { ).to(stringSerde, longSerde, "cnt"); KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName"); - cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); + cntTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("cnt")); // dif maxTable.join(minTable, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index c2cfd847ccd..a0c2933e6fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -77,7 +77,6 @@ int next() { // This main() is not used by the system test. It is intended to be used for local debugging. public static void main(String[] args) throws Exception { final String kafka = "localhost:9092"; - final String zookeeper = "localhost:2181"; final File stateDir = TestUtils.tempDirectory(); final int numKeys = 20; @@ -131,42 +130,50 @@ public void run() { } public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception { + return generate(kafka, numKeys, maxRecordsPerKey, true); + } + public static Map<String, Set<Integer>> generate(final String kafka, + final int numKeys, + final int maxRecordsPerKey, + final boolean autoTerminate) throws Exception { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - // the next 4 config values make sure that all records are produced with no loss and - // no duplicates + // the next 2 config values make sure that all records are produced with no loss and no duplicates producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); int numRecordsProduced = 0; - Map<String, Set<Integer>> allData = new HashMap<>(); - ValueList[] data = new ValueList[numKeys]; + final Map<String, Set<Integer>> allData = new HashMap<>(); + final ValueList[] data = new ValueList[numKeys]; for (int i = 0; i < numKeys; i++) { data[i] = new ValueList(i, i + maxRecordsPerKey - 1); allData.put(data[i].key, new HashSet<Integer>()); } - Random rand = new Random(); + final Random rand = new Random(); - int remaining = data.length; + int remaining = 1; // dummy value must be positive if <autoTerminate> is false + if (autoTerminate) { + remaining = data.length; + } while (remaining > 0) { - int index = rand.nextInt(remaining); - String key = data[index].key; + final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys); + final String key = data[index].key; int value = data[index].next(); - if (value < 0) { + if (autoTerminate && value < 0) { remaining--; data[index] = data[remaining]; } else { - ProducerRecord<byte[], byte[]> record = - new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); + final ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); producer.send(record, new Callback() { @Override @@ -178,11 +185,12 @@ public void onCompletion(final RecordMetadata metadata, final Exception exceptio } }); - numRecordsProduced++; allData.get(key).add(value); - if (numRecordsProduced % 100 == 0) + + if (numRecordsProduced % 100 == 0) { System.out.println(numRecordsProduced + " records produced"); + } Utils.sleep(2); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index 73fe27c4659..87ab60c12ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -33,8 +33,6 @@ public class SmokeTestUtil { - public final static int WINDOW_SIZE = 100; - public final static long START_TIME = 60000L * 60 * 24 * 365 * 30; public final static int END = Integer.MAX_VALUE; public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) { @@ -46,18 +44,15 @@ public Processor<Object, Object> get() { return new AbstractProcessor<Object, Object>() { private int numRecordsProcessed = 0; - private ProcessorContext context; @Override public void init(ProcessorContext context) { System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; - this.context = context; } @Override public void process(Object key, Object value) { - if (printOffset) System.out.println(">>> " + context.offset()); numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); @@ -65,12 +60,10 @@ public void process(Object key, Object value) { } @Override - public void punctuate(long timestamp) { - } + public void punctuate(long timestamp) {} @Override - public void close() { - } + public void close() {} }; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index 304cae7e0ad..aa1def1dd22 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -24,7 +24,7 @@ public class StreamsSmokeTest { /** - * args ::= command kafka zookeeper stateDir + * args ::= command kafka zookeeper stateDir disableAutoTerminate * command := "run" | "process" * * @param args @@ -33,11 +33,13 @@ public static void main(String[] args) throws Exception { String kafka = args[0]; String stateDir = args.length > 1 ? args[1] : null; String command = args.length > 2 ? args[2] : null; + boolean disableAutoTerminate = args.length > 3; - System.out.println("StreamsTest instance started"); + System.out.println("StreamsTest instance started (StreamsSmokeTest)"); System.out.println("command=" + command); System.out.println("kafka=" + kafka); System.out.println("stateDir=" + stateDir); + System.out.println("disableAutoTerminate=" + disableAutoTerminate); switch (command) { case "standalone": @@ -47,8 +49,12 @@ public static void main(String[] args) throws Exception { // this starts the driver (data generation and result verification) final int numKeys = 10; final int maxRecordsPerKey = 500; - Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); - SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + if (disableAutoTerminate) { + SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false); + } else { + Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + } break; case "process": // this starts a KafkaStreams client diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 00000000000..17ff97ea083 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 2) { + System.err.println("StreamsUpgradeTest requires two argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] : "")); + } + final String kafka = args[0]; + final String stateDir = args[1]; + final String upgradeFrom = args.length > 2 ? args[2] : null; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)"); + System.out.println("kafka=" + kafka); + System.out.println("stateDir=" + stateDir); + System.out.println("upgradeFrom=" + upgradeFrom); + + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream dataStream = builder.stream("data"); + dataStream.process(SmokeTestUtil.printProcessorSupplier("data")); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + if (upgradeFrom != null) { + config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom); + } + + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } +} diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 00000000000..7d3ed436881 --- /dev/null +++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 3) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "") + + (args.length > 1 ? args[1] : "")); + } + final String kafka = args[0]; + final String zookeeper = args[1]; + final String stateDir = args[2]; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)"); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { + return new ProcessorSupplier<K, V>() { + public Processor<K, V> get() { + return new AbstractProcessor<K, V>() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 00000000000..604fbe71ad3 --- /dev/null +++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + /** + * This test cannot be run executed, as long as Kafka 0.10.1.2 is not released + */ + @SuppressWarnings("unchecked") + public static void main(final String[] args) { + if (args.length < 3) { + System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: " + + (args.length > 0 ? args[0] + " " : "") + + (args.length > 1 ? args[1] : "")); + } + String kafka = args[0]; + String zookeeper = args[1]; + String stateDir = args[2]; + String upgradeFrom = args.length > 3 ? args[3] : null; + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)"); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); + System.out.println("upgradeFrom=" + upgradeFrom); + + final KStreamBuilder builder = new KStreamBuilder(); + KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + if (upgradeFrom != null) { + // TODO: because Kafka 0.10.1.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet + //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom); + config.setProperty("upgrade.from", upgradeFrom); + } + + final KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + } + }); + } + + private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { + return new ProcessorSupplier<K, V>() { + public Processor<K, V> get() { + return new AbstractProcessor<K, V>() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void punctuate(final long timestamp) {} + + @Override + public void close() {} + }; + } + }; + } +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index e7be9475f79..b7de568ad7a 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -20,6 +20,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1 class StreamsTestBaseService(KafkaPathResolverMixin, Service): @@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") + CLEAN_NODE_ENABLED = True + logs = { "streams_log": { "path": LOG_FILE, @@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "streams_stderr": { "path": STDERR_FILE, "collect_default": True}, + "streams_log.0-1": { + "path": LOG_FILE + ".0-1", + "collect_default": True}, + "streams_stdout.0-1": { + "path": STDOUT_FILE + ".0-1", + "collect_default": True}, + "streams_stderr.0-1": { + "path": STDERR_FILE + ".0-1", + "collect_default": True}, + "streams_log.0-2": { + "path": LOG_FILE + ".0-2", + "collect_default": True}, + "streams_stdout.0-2": { + "path": STDOUT_FILE + ".0-2", + "collect_default": True}, + "streams_stderr.0-2": { + "path": STDERR_FILE + ".0-2", + "collect_default": True}, + "streams_log.0-3": { + "path": LOG_FILE + ".0-3", + "collect_default": True}, + "streams_stdout.0-3": { + "path": STDOUT_FILE + ".0-3", + "collect_default": True}, + "streams_stderr.0-3": { + "path": STDERR_FILE + ".0-3", + "collect_default": True}, + "streams_log.0-4": { + "path": LOG_FILE + ".0-4", + "collect_default": True}, + "streams_stdout.0-4": { + "path": STDOUT_FILE + ".0-4", + "collect_default": True}, + "streams_stderr.0-4": { + "path": STDERR_FILE + ".0-4", + "collect_default": True}, + "streams_log.0-5": { + "path": LOG_FILE + ".0-5", + "collect_default": True}, + "streams_stdout.0-5": { + "path": STDOUT_FILE + ".0-5", + "collect_default": True}, + "streams_stderr.0-5": { + "path": STDERR_FILE + ".0-5", + "collect_default": True}, + "streams_log.0-6": { + "path": LOG_FILE + ".0-6", + "collect_default": True}, + "streams_stdout.0-6": { + "path": STDOUT_FILE + ".0-6", + "collect_default": True}, + "streams_stderr.0-6": { + "path": STDERR_FILE + ".0-6", + "collect_default": True}, + "streams_log.1-1": { + "path": LOG_FILE + ".1-1", + "collect_default": True}, + "streams_stdout.1-1": { + "path": STDOUT_FILE + ".1-1", + "collect_default": True}, + "streams_stderr.1-1": { + "path": STDERR_FILE + ".1-1", + "collect_default": True}, + "streams_log.1-2": { + "path": LOG_FILE + ".1-2", + "collect_default": True}, + "streams_stdout.1-2": { + "path": STDOUT_FILE + ".1-2", + "collect_default": True}, + "streams_stderr.1-2": { + "path": STDERR_FILE + ".1-2", + "collect_default": True}, + "streams_log.1-3": { + "path": LOG_FILE + ".1-3", + "collect_default": True}, + "streams_stdout.1-3": { + "path": STDOUT_FILE + ".1-3", + "collect_default": True}, + "streams_stderr.1-3": { + "path": STDERR_FILE + ".1-3", + "collect_default": True}, + "streams_log.1-4": { + "path": LOG_FILE + ".1-4", + "collect_default": True}, + "streams_stdout.1-4": { + "path": STDOUT_FILE + ".1-4", + "collect_default": True}, + "streams_stderr.1-4": { + "path": STDERR_FILE + ".1-4", + "collect_default": True}, + "streams_log.1-5": { + "path": LOG_FILE + ".1-5", + "collect_default": True}, + "streams_stdout.1-5": { + "path": STDOUT_FILE + ".1-5", + "collect_default": True}, + "streams_stderr.1-5": { + "path": STDERR_FILE + ".1-5", + "collect_default": True}, + "streams_log.1-6": { + "path": LOG_FILE + ".1-6", + "collect_default": True}, + "streams_stdout.1-6": { + "path": STDOUT_FILE + ".1-6", + "collect_default": True}, + "streams_stderr.1-6": { + "path": STDERR_FILE + ".1-6", + "collect_default": True}, } def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None): @@ -107,7 +218,8 @@ def wait_node(self, node, timeout_sec=None): def clean_node(self, node): node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + if self.CLEAN_NODE_ENABLED: + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) def start_cmd(self, node): args = self.args.copy() @@ -153,7 +265,28 @@ def __init__(self, test_context, kafka, command): class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run") + self.DISABLE_AUTO_TERMINATE = "" + + def disable_auto_terminate(self): + self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate" + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers() + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(state_dir)s %(user_test_args)s %(disable_auto_terminate)s" \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): @@ -171,3 +304,41 @@ def __init__(self, test_context, kafka): kafka, "org.apache.kafka.streams.tests.BrokerCompatibilityTest", "dummy") + +class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StreamsUpgradeTest", + "") + self.UPGRADE_FROM = "" + + def set_version(self, kafka_streams_version): + self.KAFKA_STREAMS_VERSION = kafka_streams_version + + def set_upgrade_from(self, upgrade_from): + self.UPGRADE_FROM = upgrade_from + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers() + if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1): + args['zk'] = self.kafka.zk.connect_setting() + else: + args['zk'] = "" + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['version'] = self.KAFKA_STREAMS_VERSION + args['upgrade_from'] = self.UPGRADE_FROM + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \ + " %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s %(upgrade_from)s" \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py new file mode 100644 index 00000000000..294e3544f79 --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.mark import parametrize +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService +from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, DEV_VERSION +import random + +class StreamsUpgradeTest(KafkaTest): + """ + Test upgrading Kafka Streams (all version combination) + If metadata was changes, upgrade is more difficult + Metadata version was bumped in 0.10.1.0 + """ + + def __init__(self, test_context): + super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + 'echo' : { 'partitions': 5 }, + 'data' : { 'partitions': 5 } + }) + + self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) + self.driver.disable_auto_terminate() + self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka) + + def test_simple_upgrade(self): + """ + Starts 3 KafkaStreams instances with version 0.10.1, and upgrades one-by-one to 0.10.2 + """ + + self.driver.start() + self.start_all_nodes_with(str(LATEST_0_10_1)) + + self.processors = [self.processor1, self.processor2, self.processor3] + + counter = 1 + random.seed() + + random.shuffle(self.processors) + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + self.do_rolling_bounce(p, "", str(DEV_VERSION), counter) + counter = counter + 1 + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + + #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released + @parametrize(new_version=str(DEV_VERSION)) + def test_metadata_upgrade(self, new_version): + """ + Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version> + """ + + self.driver.start() + self.start_all_nodes_with(str(LATEST_0_10_0)) + + self.processors = [self.processor1, self.processor2, self.processor3] + + counter = 1 + random.seed() + + # first rolling bounce + random.shuffle(self.processors) + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + self.do_rolling_bounce(p, "0.10.0", new_version, counter) + counter = counter + 1 + + # second rolling bounce + random.shuffle(self.processors) + for p in self.processors: + self.do_rolling_bounce(p, "", new_version, counter) + counter = counter + 1 + + # shutdown + self.driver.stop() + self.driver.wait() + + random.shuffle(self.processors) + for p in self.processors: + node = p.node + with node.account.monitor_log(p.STDOUT_FILE) as monitor: + p.stop() + monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED", + timeout_sec=60, + err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account)) + + self.driver.stop() + + def start_all_nodes_with(self, version): + # start first with <version> + self.prepare_for(self.processor1, version) + node1 = self.processor1.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor: + with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor: + self.processor1.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) + monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + + # start second with <version> + self.prepare_for(self.processor2, version) + node2 = self.processor2.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: + with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: + with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor: + self.processor2.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account)) + first_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + second_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) + + # start third with <version> + self.prepare_for(self.processor3, version) + node3 = self.processor3.node + with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: + with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: + with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: + with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor: + self.processor3.start() + log_monitor.wait_until("Kafka version : " + version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account)) + first_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account)) + second_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account)) + third_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account)) + + @staticmethod + def prepare_for(processor, version): + processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False) + processor.set_version(version) + + def do_rolling_bounce(self, processor, upgrade_from, new_version, counter): + first_other_processor = None + second_other_processor = None + for p in self.processors: + if p != processor: + if first_other_processor is None: + first_other_processor = p + else: + second_other_processor = p + + node = processor.node + first_other_node = first_other_processor.node + second_other_node = second_other_processor.node + + # stop processor and wait for rebalance of others + with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: + with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: + processor.stop() + first_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account)) + second_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account)) + node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) + + if upgrade_from == "": # upgrade disabled -- second round of rolling bounces + roll_counter = ".1-" # second round of rolling bounces + else: + roll_counter = ".0-" # first round of rolling boundes + + node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False) + node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False) + + if new_version == str(DEV_VERSION): + processor.set_version("") # set to TRUNK + else: + processor.set_version(new_version) + processor.set_upgrade_from(upgrade_from) + + grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" " + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + with node.account.monitor_log(processor.LOG_FILE) as log_monitor: + with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: + with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: + processor.start() + + log_monitor.wait_until("Kafka version : " + new_version, + timeout_sec=60, + err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account)) + first_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account)) + found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True)) + if len(found) > 0: + raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") + + second_other_monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account)) + found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True)) + if len(found) > 0: + raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") + + monitor.wait_until("processed 100 records from topic", + timeout_sec=60, + err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account)) \ No newline at end of file diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 7cd489d87ac..df956027509 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -61,6 +61,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") +DEV_VERSION = KafkaVersion("0.10.2.2-SNAPSHOT") # 0.8.2.X versions V_0_8_2_1 = KafkaVersion("0.8.2.1") diff --git a/vagrant/base.sh b/vagrant/base.sh index 0bb0f30054b..70987c6c135 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -52,6 +52,8 @@ get_kafka() { kafka_dir=/opt/kafka-$version url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz + # the .tgz above does not include the streams test jar hence we need to get it separately + url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar if [ ! -d /opt/kafka-$version ]; then pushd /tmp curl -O $url ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > ----------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1 > Reporter: James Cheng > Assignee: Matthias J. Sax > Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)