This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 86de2df MINOR: code cleanup (#6057)
86de2df is described below
commit 86de2dfd27f96dd3fea0635ad110c63bc639c721
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 9 15:04:52 2019 +0100
MINOR: code cleanup (#6057)
Reviewers: Bill Bejeck <[email protected]>, John Roesler
<[email protected]>, Guozhang Wang <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 267 +++++++++------------
.../GlobalKTableEOSIntegrationTest.java | 126 ++++------
.../integration/GlobalKTableIntegrationTest.java | 102 +++-----
.../integration/GlobalThreadShutDownOrderTest.java | 68 ++----
.../KTableSourceTopicRestartIntegrationTest.java | 31 +--
.../PurgeRepartitionTopicIntegrationTest.java | 71 +++---
.../integration/QueryableStateIntegrationTest.java | 254 +++++++++++++-------
7 files changed, 442 insertions(+), 477 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index bdfbb3b..505d454 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -40,7 +41,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
@@ -49,6 +49,7 @@ import org.junit.experimental.categories.Category;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -98,7 +99,7 @@ public class EosIntegrationTest {
private int testNumber = 0;
@Before
- public void createTopics() throws InterruptedException {
+ public void createTopics() throws Exception {
applicationId = "appId-" + ++testNumber;
CLUSTER.deleteTopicsAndWait(
SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
@@ -154,26 +155,24 @@ public class EosIntegrationTest {
output.to(outputTopic);
for (int i = 0; i < numberOfRestarts; ++i) {
- final KafkaStreams streams = new KafkaStreams(
- builder.build(),
- StreamsTestUtils.getStreamsConfig(
- applicationId,
- CLUSTER.bootstrapServers(),
- Serdes.LongSerde.class.getName(),
- Serdes.LongSerde.class.getName(),
- new Properties() {
- {
- put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
-
put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
"1000");
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
-
put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
- }
- }));
+ final Properties config = StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ new Properties() {
+ {
+ put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
+ put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
+
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
"1000");
+
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
+
put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+ }
+ });
- try {
+ try (final KafkaStreams streams = new
KafkaStreams(builder.build(), config)) {
streams.start();
final List<KeyValue<Long, Long>> inputData = prepareData(i *
100, i * 100 + 10L, 0L, 1L);
@@ -192,18 +191,15 @@ public class EosIntegrationTest {
CONSUMER_GROUP_ID,
LongDeserializer.class,
LongDeserializer.class,
- new Properties() {
- {
- put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
- }
- }),
+ Utils.mkProperties(Collections.singletonMap(
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
+ ),
outputTopic,
inputData.size()
);
checkResultPerKey(committedRecords, inputData);
- } finally {
- streams.close();
}
}
}
@@ -220,13 +216,15 @@ public class EosIntegrationTest {
}
- private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long,
Long>> records) {
+ private void addAllKeys(final Set<Long> allKeys,
+ final List<KeyValue<Long, Long>> records) {
for (final KeyValue<Long, Long> record : records) {
allKeys.add(record.key);
}
}
- private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key,
final List<KeyValue<Long, Long>> records) {
+ private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key,
+ final
List<KeyValue<Long, Long>> records) {
final List<KeyValue<Long, Long>> recordsPerKey = new
ArrayList<>(records.size());
for (final KeyValue<Long, Long> record : records) {
@@ -243,24 +241,21 @@ public class EosIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(
- builder.build(),
- StreamsTestUtils.getStreamsConfig(
- applicationId,
- CLUSTER.bootstrapServers(),
- Serdes.LongSerde.class.getName(),
- Serdes.LongSerde.class.getName(),
- new Properties() {
- {
- put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
- put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
- put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
- put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- }
- }));
-
- try {
+ final Properties properties = new Properties();
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
+ properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ final Properties config = StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ properties);
+
+ try (final KafkaStreams streams = new KafkaStreams(builder.build(),
config)) {
streams.start();
final List<KeyValue<Long, Long>> firstBurstOfData =
prepareData(0L, 5L, 0L);
@@ -315,8 +310,6 @@ public class EosIntegrationTest {
);
assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
- } finally {
- streams.close();
}
}
@@ -330,8 +323,7 @@ public class EosIntegrationTest {
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50
record got written)
- final KafkaStreams streams = getKafkaStreams(false, "appDir", 2);
- try {
+ try (final KafkaStreams streams = getKafkaStreams(false, "appDir", 2))
{
streams.start();
final List<KeyValue<Long, Long>> committedDataBeforeFailure =
prepareData(0L, 10L, 0L, 1L);
@@ -345,12 +337,9 @@ public class EosIntegrationTest {
writeInputData(committedDataBeforeFailure);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return commitRequested.get() == 2;
- }
- }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+ TestUtils.waitForCondition(
+ () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+ "SteamsTasks did not request commit.");
writeInputData(uncommittedDataBeforeFailure);
@@ -363,12 +352,9 @@ public class EosIntegrationTest {
errorInjected.set(true);
writeInputData(dataAfterFailure);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return uncaughtException != null;
- }
- }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one
StreamThread.");
+ TestUtils.waitForCondition(
+ () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+ "Should receive uncaught exception from one StreamThread.");
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() +
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -389,8 +375,6 @@ public class EosIntegrationTest {
checkResultPerKey(allCommittedRecords,
allExpectedCommittedRecordsAfterRecovery);
checkResultPerKey(committedRecordsAfterFailure,
expectedCommittedRecordsAfterRecovery);
- } finally {
- streams.close();
}
}
@@ -407,8 +391,7 @@ public class EosIntegrationTest {
// after fail over, we should read 40 committed records and the state
stores should contain the correct sums
// per key (even if some records got processed twice)
- final KafkaStreams streams = getKafkaStreams(true, "appDir", 2);
- try {
+ try (final KafkaStreams streams = getKafkaStreams(true, "appDir", 2)) {
streams.start();
final List<KeyValue<Long, Long>> committedDataBeforeFailure =
prepareData(0L, 10L, 0L, 1L);
@@ -422,12 +405,9 @@ public class EosIntegrationTest {
writeInputData(committedDataBeforeFailure);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return commitRequested.get() == 2;
- }
- }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+ TestUtils.waitForCondition(
+ () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+ "SteamsTasks did not request commit.");
writeInputData(uncommittedDataBeforeFailure);
@@ -442,12 +422,9 @@ public class EosIntegrationTest {
errorInjected.set(true);
writeInputData(dataAfterFailure);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return uncaughtException != null;
- }
- }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one
StreamThread.");
+ TestUtils.waitForCondition(
+ () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+ "Should receive uncaught exception from one StreamThread.");
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() +
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -465,11 +442,11 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> expectedResult =
computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
checkResultPerKey(allCommittedRecords, expectedResult);
- checkResultPerKey(committedRecordsAfterFailure,
expectedResult.subList(committedDataBeforeFailure.size(),
expectedResult.size()));
+ checkResultPerKey(
+ committedRecordsAfterFailure,
+ expectedResult.subList(committedDataBeforeFailure.size(),
expectedResult.size()));
verifyStateStore(streams, getMaxPerKey(expectedResult));
- } finally {
- streams.close();
}
}
@@ -486,9 +463,10 @@ public class EosIntegrationTest {
// afterwards, the "stalling" thread resumes, and another rebalance
should get triggered
// we write the remaining 20 records and verify to read 60 result
records
- final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
- final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1);
- try {
+ try (
+ final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
+ final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1)
+ ) {
streams1.start();
streams2.start();
@@ -505,12 +483,9 @@ public class EosIntegrationTest {
writeInputData(committedDataBeforeGC);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return commitRequested.get() == 2;
- }
- }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+ TestUtils.waitForCondition(
+ () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+ "SteamsTasks did not request commit.");
writeInputData(uncommittedDataBeforeGC);
@@ -523,14 +498,12 @@ public class EosIntegrationTest {
gcInjected.set(true);
writeInputData(dataToTriggerFirstRebalance);
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams1.allMetadata().size() == 1 &&
streams2.allMetadata().size() == 1 &&
-
(streams1.allMetadata().iterator().next().topicPartitions().size() == 2
- ||
streams2.allMetadata().iterator().next().topicPartitions().size() == 2);
- }
- }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+ TestUtils.waitForCondition(
+ () -> streams1.allMetadata().size() == 1
+ && streams2.allMetadata().size() == 1
+ &&
(streams1.allMetadata().iterator().next().topicPartitions().size() == 2
+ ||
streams2.allMetadata().iterator().next().topicPartitions().size() == 2),
+ MAX_WAIT_TIME_MS, "Should have rebalanced.");
final List<KeyValue<Long, Long>> committedRecordsAfterRebalance =
readResult(
uncommittedDataBeforeGC.size() +
dataToTriggerFirstRebalance.size(),
@@ -543,14 +516,13 @@ public class EosIntegrationTest {
checkResultPerKey(committedRecordsAfterRebalance,
expectedCommittedRecordsAfterRebalance);
doGC = false;
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return streams1.allMetadata().size() == 1 &&
streams2.allMetadata().size() == 1
- &&
streams1.allMetadata().iterator().next().topicPartitions().size() == 1
- &&
streams2.allMetadata().iterator().next().topicPartitions().size() == 1;
- }
- }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+ TestUtils.waitForCondition(
+ () -> streams1.allMetadata().size() == 1
+ && streams2.allMetadata().size() == 1
+ &&
streams1.allMetadata().iterator().next().topicPartitions().size() == 1
+ &&
streams2.allMetadata().iterator().next().topicPartitions().size() == 1,
+ MAX_WAIT_TIME_MS,
+ "Should have rebalanced.");
writeInputData(dataAfterSecondRebalance);
@@ -566,13 +538,12 @@ public class EosIntegrationTest {
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
checkResultPerKey(allCommittedRecords,
allExpectedCommittedRecordsAfterRecovery);
- } finally {
- streams1.close();
- streams2.close();
}
}
- private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
final long toExclusive, final Long... keys) {
+ private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
+ final long toExclusive,
+ final Long... keys) {
final List<KeyValue<Long, Long>> data = new ArrayList<>();
for (final Long k : keys) {
@@ -584,7 +555,9 @@ public class EosIntegrationTest {
return data;
}
- private KafkaStreams getKafkaStreams(final boolean withState, final String
appDir, final int numberOfStreamsThreads) {
+ private KafkaStreams getKafkaStreams(final boolean withState,
+ final String appDir,
+ final int numberOfStreamsThreads) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
gcInjected = new AtomicBoolean(false);
@@ -619,7 +592,8 @@ public class EosIntegrationTest {
}
@Override
- public KeyValue<Long, Long> transform(final Long key,
final Long value) {
+ public KeyValue<Long, Long> transform(final Long key,
+ final Long value) {
if (gcInjected.compareAndSet(true, false)) {
while (doGC) {
try {
@@ -666,38 +640,34 @@ public class EosIntegrationTest {
} }, storeNames)
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
- final KafkaStreams streams = new KafkaStreams(
- builder.build(),
- StreamsTestUtils.getStreamsConfig(
- applicationId,
- CLUSTER.bootstrapServers(),
- Serdes.LongSerde.class.getName(),
- Serdes.LongSerde.class.getName(),
- new Properties() {
- {
- put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
- put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numberOfStreamsThreads);
- put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.MAX_VALUE);
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
"1000");
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 *
1000);
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 *
1000 - 1);
-
put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
MAX_POLL_INTERVAL_MS);
- put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath() + File.separator + appDir);
- put(StreamsConfig.APPLICATION_SERVER_CONFIG,
"dummy:2142");
- }
- }));
-
- streams.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- if (uncaughtException != null) {
- e.printStackTrace(System.err);
- fail("Should only get one uncaught exception from
Streams.");
- }
- uncaughtException = e;
+ final Properties properties = new Properties();
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
+ properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
numberOfStreamsThreads);
+ properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.MAX_VALUE);
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
"1000");
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
5 * 1000);
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
5 * 1000 - 1);
+
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
MAX_POLL_INTERVAL_MS);
+ properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ properties.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath() + File.separator + appDir);
+ properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142");
+
+ final Properties config = StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ properties);
+
+ final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+ streams.setUncaughtExceptionHandler((t, e) -> {
+ if (uncaughtException != null) {
+ e.printStackTrace(System.err);
+ fail("Should only get one uncaught exception from Streams.");
}
+ uncaughtException = e;
});
return streams;
@@ -713,7 +683,7 @@ public class EosIntegrationTest {
}
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
- final String groupId) throws
InterruptedException {
+ final String groupId) throws
Exception {
if (groupId != null) {
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
@@ -778,13 +748,14 @@ public class EosIntegrationTest {
return expectedResult;
}
- private void verifyStateStore(final KafkaStreams streams, final
Set<KeyValue<Long, Long>> expectedStoreContent) {
+ private void verifyStateStore(final KafkaStreams streams,
+ final Set<KeyValue<Long, Long>>
expectedStoreContent) {
ReadOnlyKeyValueStore<Long, Long> store = null;
final long maxWaitingTime = System.currentTimeMillis() + 300000L;
while (System.currentTimeMillis() < maxWaitingTime) {
try {
- store = streams.store(storeName, QueryableStoreTypes.<Long,
Long>keyValueStore());
+ store = streams.store(storeName,
QueryableStoreTypes.keyValueStore());
break;
} catch (final InvalidStateStoreException okJustRetry) {
try {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 084519e..787cb29 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -31,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -49,7 +48,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@@ -72,18 +70,8 @@ public class GlobalKTableEOSIntegrationTest {
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
- private final KeyValueMapper<String, Long, Long> keyMapper = new
KeyValueMapper<String, Long, Long>() {
- @Override
- public Long apply(final String key, final Long value) {
- return value;
- }
- };
- private final ValueJoiner<Long, String, String> joiner = new
ValueJoiner<Long, String, String>() {
- @Override
- public String apply(final Long value1, final String value2) {
- return value1 + "+" + value2;
- }
- };
+ private final KeyValueMapper<String, Long, Long> keyMapper = (key, value)
-> value;
+ private final ValueJoiner<Long, String, String> joiner = (value1, value2)
-> value1 + "+" + value2;
private final String globalStore = "globalStore";
private final Map<String, String> results = new HashMap<>();
private StreamsBuilder builder;
@@ -96,7 +84,7 @@ public class GlobalKTableEOSIntegrationTest {
private ForeachAction<String, String> foreachAction;
@Before
- public void before() throws InterruptedException {
+ public void before() throws Exception {
testNo++;
builder = new StreamsBuilder();
createTopics();
@@ -116,16 +104,11 @@ public class GlobalKTableEOSIntegrationTest {
.withValueSerde(Serdes.String()));
final Consumed<String, Long> stringLongConsumed =
Consumed.with(Serdes.String(), Serdes.Long());
stream = builder.stream(streamTopic, stringLongConsumed);
- foreachAction = new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- results.put(key, value);
- }
- };
+ foreachAction = results::put;
}
@After
- public void whenShuttingDown() throws IOException {
+ public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
@@ -147,24 +130,22 @@ public class GlobalKTableEOSIntegrationTest {
expected.put("d", "4+D");
expected.put("e", "5+null");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for initial values");
produceGlobalTableValues();
- final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(globalStore, QueryableStoreTypes.<Long,
String>keyValueStore());
+ final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+ kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
+
+ TestUtils.waitForCondition(
+ () -> "J".equals(replicatedStore.get(5L)),
+ 30000,
+ "waiting for data in replicated store");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "J".equals(replicatedStore.get(5L));
- }
- }, 30000, "waiting for data in replicated store");
produceTopicValues(streamTopic);
expected.put("a", "1+F");
@@ -173,12 +154,10 @@ public class GlobalKTableEOSIntegrationTest {
expected.put("d", "4+I");
expected.put("e", "5+J");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for final values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for final values");
}
@Test
@@ -195,24 +174,21 @@ public class GlobalKTableEOSIntegrationTest {
expected.put("c", "3+C");
expected.put("d", "4+D");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for initial values");
produceGlobalTableValues();
- final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(globalStore, QueryableStoreTypes.<Long,
String>keyValueStore());
+ final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+ kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "J".equals(replicatedStore.get(5L));
- }
- }, 30000, "waiting for data in replicated store");
+ TestUtils.waitForCondition(
+ () -> "J".equals(replicatedStore.get(5L)),
+ 30000,
+ "waiting for data in replicated store");
produceTopicValues(streamTopic);
@@ -222,12 +198,10 @@ public class GlobalKTableEOSIntegrationTest {
expected.put("d", "4+I");
expected.put("e", "5+J");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for final values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for final values");
}
@Test
@@ -242,12 +216,11 @@ public class GlobalKTableEOSIntegrationTest {
expected.put(3L, "C");
expected.put(4L, "D");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- ReadOnlyKeyValueStore<Long, String> store = null;
+ TestUtils.waitForCondition(
+ () -> {
+ final ReadOnlyKeyValueStore<Long, String> store;
try {
- store = kafkaStreams.store(globalStore,
QueryableStoreTypes.<Long, String>keyValueStore());
+ store = kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
} catch (final InvalidStateStoreException ex) {
return false;
}
@@ -258,8 +231,9 @@ public class GlobalKTableEOSIntegrationTest {
result.put(kv.key, kv.value);
}
return result.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ },
+ 30000L,
+ "waiting for initial values");
}
@Test
@@ -276,12 +250,11 @@ public class GlobalKTableEOSIntegrationTest {
expected.put(3L, "C");
expected.put(4L, "D");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- ReadOnlyKeyValueStore<Long, String> store = null;
+ TestUtils.waitForCondition(
+ () -> {
+ final ReadOnlyKeyValueStore<Long, String> store;
try {
- store = kafkaStreams.store(globalStore,
QueryableStoreTypes.<Long, String>keyValueStore());
+ store = kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
} catch (final InvalidStateStoreException ex) {
return false;
}
@@ -292,11 +265,12 @@ public class GlobalKTableEOSIntegrationTest {
result.put(kv.key, kv.value);
}
return result.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ },
+ 30000L,
+ "waiting for initial values");
}
- private void createTopics() throws InterruptedException {
+ private void createTopics() throws Exception {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;
CLUSTER.createTopics(streamTopic);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 013e2b6..2190d70 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -48,7 +47,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -62,23 +60,12 @@ public class GlobalKTableIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile int testNo = 0;
private final MockTime mockTime = CLUSTER.time;
- private final KeyValueMapper<String, Long, Long> keyMapper = new
KeyValueMapper<String, Long, Long>() {
- @Override
- public Long apply(final String key, final Long value) {
- return value;
- }
- };
- private final ValueJoiner<Long, String, String> joiner = new
ValueJoiner<Long, String, String>() {
- @Override
- public String apply(final Long value1, final String value2) {
- return value1 + "+" + value2;
- }
- };
+ private final KeyValueMapper<String, Long, Long> keyMapper = (key, value)
-> value;
+ private final ValueJoiner<Long, String, String> joiner = (value1, value2)
-> value1 + "+" + value2;
private final String globalStore = "globalStore";
private final Map<String, String> results = new HashMap<>();
private StreamsBuilder builder;
@@ -91,15 +78,14 @@ public class GlobalKTableIntegrationTest {
private ForeachAction<String, String> foreachAction;
@Before
- public void before() throws InterruptedException {
+ public void before() throws Exception {
testNo++;
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "globalTableTopic-table-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -111,16 +97,11 @@ public class GlobalKTableIntegrationTest {
.withValueSerde(Serdes.String()));
final Consumed<String, Long> stringLongConsumed =
Consumed.with(Serdes.String(), Serdes.Long());
stream = builder.stream(streamTopic, stringLongConsumed);
- foreachAction = new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- results.put(key, value);
- }
- };
+ foreachAction = results::put;
}
@After
- public void whenShuttingDown() throws IOException {
+ public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
@@ -142,24 +123,22 @@ public class GlobalKTableIntegrationTest {
expected.put("d", "4+D");
expected.put("e", "5+null");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for initial values");
produceGlobalTableValues();
- final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(globalStore, QueryableStoreTypes.<Long,
String>keyValueStore());
+ final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+ kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
+
+ TestUtils.waitForCondition(
+ () -> "J".equals(replicatedStore.get(5L)),
+ 30000,
+ "waiting for data in replicated store");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "J".equals(replicatedStore.get(5L));
- }
- }, 30000, "waiting for data in replicated store");
produceTopicValues(streamTopic);
expected.put("a", "1+F");
@@ -168,12 +147,10 @@ public class GlobalKTableIntegrationTest {
expected.put("d", "4+I");
expected.put("e", "5+J");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for final values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for final values");
}
@Test
@@ -190,24 +167,21 @@ public class GlobalKTableIntegrationTest {
expected.put("c", "3+C");
expected.put("d", "4+D");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for initial values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for initial values");
produceGlobalTableValues();
- final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(globalStore, QueryableStoreTypes.<Long,
String>keyValueStore());
+ final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+ kafkaStreams.store(globalStore,
QueryableStoreTypes.keyValueStore());
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return "J".equals(replicatedStore.get(5L));
- }
- }, 30000, "waiting for data in replicated store");
+ TestUtils.waitForCondition(
+ () -> "J".equals(replicatedStore.get(5L)),
+ 30000,
+ "waiting for data in replicated store");
produceTopicValues(streamTopic);
@@ -217,12 +191,10 @@ public class GlobalKTableIntegrationTest {
expected.put("d", "4+I");
expected.put("e", "5+J");
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return results.equals(expected);
- }
- }, 30000L, "waiting for final values");
+ TestUtils.waitForCondition(
+ () -> results.equals(expected),
+ 30000L,
+ "waiting for final values");
}
@Test
@@ -245,7 +217,7 @@ public class GlobalKTableIntegrationTest {
assertThat(store.approximateNumEntries(), equalTo(4L));
}
- private void createTopics() throws InterruptedException {
+ private void createTopics() throws Exception {
streamTopic = "stream-" + testNo;
globalTableTopic = "globalTable-" + testNo;
CLUSTER.createTopics(streamTopic);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index e0bec90..7c16927 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -17,30 +17,26 @@
package org.apache.kafka.streams.integration;
-import java.time.Duration;
+import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -48,14 +44,12 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
-import kafka.utils.MockTime;
-
import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class})
@@ -71,8 +65,7 @@ public class GlobalThreadShutDownOrderTest {
}
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
private final MockTime mockTime = CLUSTER.time;
private final String globalStore = "globalStore";
@@ -81,20 +74,17 @@ public class GlobalThreadShutDownOrderTest {
private KafkaStreams kafkaStreams;
private String globalStoreTopic;
private String streamTopic;
- private KStream<String, Long> stream;
- private List<Long> retrievedValuesList = new ArrayList<>();
+ private final List<Long> retrievedValuesList = new ArrayList<>();
private boolean firstRecordProcessed;
@Before
- public void before() throws InterruptedException {
-
+ public void before() throws Exception {
builder = new StreamsBuilder();
createTopics();
streamsConfiguration = new Properties();
final String applicationId = "global-thread-shutdown-test";
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -103,29 +93,26 @@ public class GlobalThreadShutDownOrderTest {
final Consumed<String, Long> stringLongConsumed =
Consumed.with(Serdes.String(), Serdes.Long());
- final KeyValueStoreBuilder<String, Long> storeBuilder = new
KeyValueStoreBuilder<>(Stores.persistentKeyValueStore(globalStore),
-
Serdes.String(),
-
Serdes.Long(),
-
mockTime);
-
- builder.addGlobalStore(storeBuilder,
- globalStoreTopic,
- Consumed.with(Serdes.String(), Serdes.Long()),
- new MockProcessorSupplier());
+ final KeyValueStoreBuilder<String, Long> storeBuilder = new
KeyValueStoreBuilder<>(
+ Stores.persistentKeyValueStore(globalStore),
+ Serdes.String(),
+ Serdes.Long(),
+ mockTime);
- stream = builder.stream(streamTopic, stringLongConsumed);
+ builder.addGlobalStore(
+ storeBuilder,
+ globalStoreTopic,
+ Consumed.with(Serdes.String(), Serdes.Long()),
+ new MockProcessorSupplier());
- stream.process(new ProcessorSupplier<String, Long>() {
- @Override
- public Processor<String, Long> get() {
- return new GlobalStoreProcessor(globalStore);
- }
- });
+ builder
+ .stream(streamTopic, stringLongConsumed)
+ .process(() -> new GlobalStoreProcessor(globalStore));
}
@After
- public void whenShuttingDown() throws IOException {
+ public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
@@ -141,12 +128,10 @@ public class GlobalThreadShutDownOrderTest {
kafkaStreams.start();
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return firstRecordProcessed;
- }
- }, 30000, "Has not processed record within 30 seconds");
+ TestUtils.waitForCondition(
+ () -> firstRecordProcessed,
+ 30000,
+ "Has not processed record within 30 seconds");
kafkaStreams.close(Duration.ofSeconds(30));
@@ -155,7 +140,7 @@ public class GlobalThreadShutDownOrderTest {
}
- private void createTopics() throws InterruptedException {
+ private void createTopics() throws Exception {
streamTopic = "stream-topic";
globalStoreTopic = "global-store-topic";
CLUSTER.createTopics(streamTopic);
@@ -186,7 +171,6 @@ public class GlobalThreadShutDownOrderTest {
private final String storeName;
GlobalStoreProcessor(final String storeName) {
-
this.storeName = storeName;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 1707679..32d77c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -18,7 +18,6 @@
package org.apache.kafka.streams.integration;
-import java.time.Duration;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
@@ -30,13 +29,11 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -45,7 +42,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -95,19 +92,14 @@ public class KTableSourceTopicRestartIntegrationTest {
@Before
public void before() {
final KTable<String, String> kTable =
streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store"));
- kTable.toStream().foreach(new ForeachAction<String, String>() {
- @Override
- public void apply(final String key, final String value) {
- readKeyValues.put(key, value);
- }
- });
+ kTable.toStream().foreach(readKeyValues::put);
expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
expectedResultsWithDataWrittenDuringRestoreMap =
createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
}
@After
- public void after() throws IOException {
+ public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
@@ -129,7 +121,10 @@ public class KTableSourceTopicRestartIntegrationTest {
produceKeyValues("f", "g", "h");
- assertNumberValuesRead(readKeyValues,
expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values
after restart");
+ assertNumberValuesRead(
+ readKeyValues,
+ expectedResultsWithDataWrittenDuringRestoreMap,
+ "Table did not get all values after restart");
} finally {
streamsOne.close(Duration.ofSeconds(5));
}
@@ -154,7 +149,10 @@ public class KTableSourceTopicRestartIntegrationTest {
produceKeyValues("f", "g", "h");
- assertNumberValuesRead(readKeyValues,
expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values
after restart");
+ assertNumberValuesRead(
+ readKeyValues,
+ expectedResultsWithDataWrittenDuringRestoreMap,
+ "Table did not get all values after restart");
} finally {
streamsOne.close(Duration.ofSeconds(5));
}
@@ -188,12 +186,7 @@ public class KTableSourceTopicRestartIntegrationTest {
final Map<String, String> expectedMap,
final String errorMessage) throws
InterruptedException {
TestUtils.waitForCondition(
- new TestCondition() {
- @Override
- public boolean conditionMet() {
- return valueMap.equals(expectedMap);
- }
- },
+ () -> valueMap.equals(expectedMap),
30 * 1000L,
errorMessage);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 96d7d14..4c7859b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import java.time.Duration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -44,14 +43,13 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
@Category({IntegrationTest.class})
public class PurgeRepartitionTopicIntegrationTest {
@@ -64,18 +62,18 @@ public class PurgeRepartitionTopicIntegrationTest {
private static AdminClient adminClient;
private static KafkaStreams kafkaStreams;
- private static Integer purgeIntervalMs = 10;
- private static Integer purgeSegmentBytes = 2000;
+ private static final Integer PURGE_INTERVAL_MS = 10;
+ private static final Integer PURGE_SEGMENT_BYTES = 2000;
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
{
- put("log.retention.check.interval.ms", purgeIntervalMs);
+ put("log.retention.check.interval.ms", PURGE_INTERVAL_MS);
put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 0);
}
});
- private Time time = CLUSTER.time;
+ private final Time time = CLUSTER.time;
private class RepartitionTopicCreatedWithExpectedConfigs implements
TestCondition {
@Override
@@ -92,11 +90,14 @@ public class PurgeRepartitionTopicIntegrationTest {
try {
final ConfigResource resource = new
ConfigResource(ConfigResource.Type.TOPIC, REPARTITION_TOPIC);
- final Config config =
adminClient.describeConfigs(Collections.singleton(resource))
- .values().get(resource).get();
+ final Config config = adminClient
+ .describeConfigs(Collections.singleton(resource))
+ .values()
+ .get(resource)
+ .get();
return
config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE)
- &&
config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(purgeIntervalMs.toString())
- &&
config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(purgeSegmentBytes.toString());
+ &&
config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(PURGE_INTERVAL_MS.toString())
+ &&
config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString());
} catch (final Exception e) {
return false;
}
@@ -104,7 +105,6 @@ public class PurgeRepartitionTopicIntegrationTest {
}
private interface TopicSizeVerifier {
-
boolean verify(long currentSize);
}
@@ -117,18 +117,19 @@ public class PurgeRepartitionTopicIntegrationTest {
@Override
public final boolean conditionMet() {
- time.sleep(purgeIntervalMs);
+ time.sleep(PURGE_INTERVAL_MS);
try {
- final Collection<DescribeLogDirsResponse.LogDirInfo>
logDirInfo =
adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
+ final Collection<DescribeLogDirsResponse.LogDirInfo>
logDirInfo =
+
adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
for (final DescribeLogDirsResponse.LogDirInfo partitionInfo :
logDirInfo) {
- final DescribeLogDirsResponse.ReplicaInfo replicaInfo =
partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0));
+ final DescribeLogDirsResponse.ReplicaInfo replicaInfo =
+ partitionInfo.replicaInfos.get(new
TopicPartition(REPARTITION_TOPIC, 0));
if (replicaInfo != null &&
verifier.verify(replicaInfo.size)) {
return true;
}
}
-
} catch (final Exception e) {
// swallow
}
@@ -138,7 +139,7 @@ public class PurgeRepartitionTopicIntegrationTest {
}
@BeforeClass
- public static void createTopics() throws InterruptedException {
+ public static void createTopics() throws Exception {
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
}
@@ -151,18 +152,17 @@ public class PurgeRepartitionTopicIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
APPLICATION_ID);
- streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
purgeIntervalMs);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
PURGE_INTERVAL_MS);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(APPLICATION_ID).getPath());
-
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG),
purgeIntervalMs);
-
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
purgeSegmentBytes);
-
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG),
purgeSegmentBytes / 2); // we cannot allow batch size larger than segment
size
+
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG),
PURGE_INTERVAL_MS);
+
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG),
PURGE_SEGMENT_BYTES);
+
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG),
PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment
size
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE,
true);
final StreamsBuilder builder = new StreamsBuilder();
-
builder.stream(INPUT_TOPIC)
.groupBy(MockMapper.selectKeyKeyValueMapper())
.count();
@@ -171,15 +171,14 @@ public class PurgeRepartitionTopicIntegrationTest {
}
@After
- public void shutdown() throws IOException {
+ public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30));
}
}
-
@Test
- public void shouldRestoreState() throws InterruptedException,
ExecutionException {
+ public void shouldRestoreState() throws Exception {
// produce some data to input topic
final List<KeyValue<Integer, Integer>> messages = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
@@ -198,26 +197,16 @@ public class PurgeRepartitionTopicIntegrationTest {
"Repartition topic " + REPARTITION_TOPIC + " not created with
the expected configs after 60000 ms.");
TestUtils.waitForCondition(
- new RepartitionTopicVerified(new TopicSizeVerifier() {
- @Override
- public boolean verify(final long currentSize) {
- return currentSize > 0;
- }
- }),
- 60000,
- "Repartition topic " + REPARTITION_TOPIC + " not received data
after 60000 ms."
+ new RepartitionTopicVerified(currentSize -> currentSize > 0),
+ 60000,
+ "Repartition topic " + REPARTITION_TOPIC + " not received data
after 60000 ms."
);
// we need long enough timeout to by-pass the log manager's
InitialTaskDelayMs, which is hard-coded on server side
TestUtils.waitForCondition(
- new RepartitionTopicVerified(new TopicSizeVerifier() {
- @Override
- public boolean verify(final long currentSize) {
- return currentSize <= purgeSegmentBytes;
- }
- }),
- 60000,
- "Repartition topic " + REPARTITION_TOPIC + " not purged data
after 60000 ms."
+ new RepartitionTopicVerified(currentSize -> currentSize <=
PURGE_SEGMENT_BYTES),
+ 60000,
+ "Repartition topic " + REPARTITION_TOPIC + " not purged data after
60000 ms."
);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 06014a1..856a85d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -99,8 +99,7 @@ public class QueryableStateIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private static final int STREAM_THREE_PARTITIONS = 4;
private final MockTime mockTime = CLUSTER.time;
private String streamOne = "stream-one";
@@ -292,27 +291,31 @@ public class QueryableStateIntegrationTest {
final Set<String> keys,
final String storeName) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(() -> {
- try {
- final StreamsMetadata metadata =
streams.metadataForKey(storeName, key, new StringSerializer());
-
- if (metadata == null ||
metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final StreamsMetadata metadata =
streams.metadataForKey(storeName, key, new StringSerializer());
+
+ if (metadata == null ||
metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ return false;
+ }
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey =
streamRunnables[index].getStream();
+ final ReadOnlyKeyValueStore<String, Long> store =
+ streamsWithKey.store(storeName,
QueryableStoreTypes.keyValueStore());
+
+ return store != null && store.get(key) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but
rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >=
1);
return false;
}
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey =
streamRunnables[index].getStream();
- final ReadOnlyKeyValueStore<String, Long> store =
streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
-
- return store != null && store.get(key) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance
hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
-
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >=
1);
- return false;
- }
- }, 120000, "waiting for metadata, store and value to be non null");
+ },
+ 120000,
+ "waiting for metadata, store and value to be non null");
}
}
@@ -324,25 +327,29 @@ public class QueryableStateIntegrationTest {
final Long from,
final Long to) throws Exception {
for (final String key : keys) {
- TestUtils.waitForCondition(() -> {
- try {
- final StreamsMetadata metadata =
streams.metadataForKey(storeName, key, new StringSerializer());
- if (metadata == null ||
metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final StreamsMetadata metadata =
streams.metadataForKey(storeName, key, new StringSerializer());
+ if (metadata == null ||
metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+ return false;
+ }
+ final int index = metadata.hostInfo().port();
+ final KafkaStreams streamsWithKey =
streamRunnables[index].getStream();
+ final ReadOnlyWindowStore<String, Long> store =
+ streamsWithKey.store(storeName,
QueryableStoreTypes.windowStore());
+ return store != null && store.fetch(key,
ofEpochMilli(from), ofEpochMilli(to)) != null;
+ } catch (final IllegalStateException e) {
+ // Kafka Streams instance may have closed but
rebalance hasn't happened
+ return false;
+ } catch (final InvalidStateStoreException e) {
+ // there must have been at least one rebalance state
+
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >=
1);
return false;
}
- final int index = metadata.hostInfo().port();
- final KafkaStreams streamsWithKey =
streamRunnables[index].getStream();
- final ReadOnlyWindowStore<String, Long> store =
streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
- return store != null && store.fetch(key,
ofEpochMilli(from), ofEpochMilli(to)) != null;
- } catch (final IllegalStateException e) {
- // Kafka Streams instance may have closed but rebalance
hasn't happened
- return false;
- } catch (final InvalidStateStoreException e) {
- // there must have been at least one rebalance state
-
assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >=
1);
- return false;
- }
- }, 120000, "waiting for metadata, store and value to be non null");
+ },
+ 120000,
+ "waiting for metadata, store and value to be non null");
}
}
@@ -359,7 +366,13 @@ public class QueryableStateIntegrationTest {
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
for (int i = 0; i < numThreads; i++) {
- streamRunnables[i] = new StreamRunnable(streamThree,
outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, i);
+ streamRunnables[i] = new StreamRunnable(
+ streamThree,
+ outputTopicThree,
+ outputTopicConcurrentWindowed,
+ storeName,
+ windowStoreName,
+ i);
streamThreads[i] = new Thread(streamRunnables[i]);
streamThreads[i].start();
}
@@ -368,10 +381,20 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
for (int i = 0; i < numThreads; i++) {
- verifyAllKVKeys(streamRunnables,
streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
+ verifyAllKVKeys(
+ streamRunnables,
+ streamRunnables[i].getStream(),
+ streamRunnables[i].getStateListener(),
+ inputValuesKeys,
storeName + "-" + streamThree);
- verifyAllWindowedKeys(streamRunnables,
streamRunnables[i].getStream(), streamRunnables[i].getStateListener(),
inputValuesKeys,
- windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
+ verifyAllWindowedKeys(
+ streamRunnables,
+ streamRunnables[i].getStream(),
+ streamRunnables[i].getStateListener(),
+ inputValuesKeys,
+ windowStoreName + "-" + streamThree,
+ 0L,
+ WINDOW_SIZE);
assertEquals(KafkaStreams.State.RUNNING,
streamRunnables[i].getStream().state());
}
@@ -383,10 +406,20 @@ public class QueryableStateIntegrationTest {
}
// query from the remaining thread
- verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(),
streamRunnables[0].getStateListener(), inputValuesKeys,
+ verifyAllKVKeys(
+ streamRunnables,
+ streamRunnables[0].getStream(),
+ streamRunnables[0].getStateListener(),
+ inputValuesKeys,
storeName + "-" + streamThree);
- verifyAllWindowedKeys(streamRunnables,
streamRunnables[0].getStream(), streamRunnables[0].getStateListener(),
inputValuesKeys,
- windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
+ verifyAllWindowedKeys(
+ streamRunnables,
+ streamRunnables[0].getStream(),
+ streamRunnables[0].getStateListener(),
+ inputValuesKeys,
+ windowStoreName + "-" + streamThree,
+ 0L,
+ WINDOW_SIZE);
assertEquals(KafkaStreams.State.RUNNING,
streamRunnables[0].getStream().state());
} finally {
for (int i = 0; i < numThreads; i++) {
@@ -407,7 +440,13 @@ public class QueryableStateIntegrationTest {
final ProducerRunnable producerRunnable = new
ProducerRunnable(streamConcurrent, inputValues, numIterations);
final Thread producerThread = new Thread(producerRunnable);
- kafkaStreams = createCountStream(streamConcurrent,
outputTopicConcurrent, outputTopicConcurrentWindowed, storeName,
windowStoreName, streamsConfiguration);
+ kafkaStreams = createCountStream(
+ streamConcurrent,
+ outputTopicConcurrent,
+ outputTopicConcurrentWindowed,
+ storeName,
+ windowStoreName,
+ streamsConfiguration);
kafkaStreams.start();
producerThread.start();
@@ -416,8 +455,8 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent,
numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed,
numberOfWordsPerIteration);
- final ReadOnlyKeyValueStore<String, Long>
- keyValueStore = kafkaStreams.store(storeName + "-" +
streamConcurrent, QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long> keyValueStore =
+ kafkaStreams.store(storeName + "-" + streamConcurrent,
QueryableStoreTypes.keyValueStore());
final ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store(windowStoreName + "-" + streamConcurrent,
QueryableStoreTypes.windowStore());
@@ -459,7 +498,8 @@ public class QueryableStateIntegrationTest {
new KeyValue<>(keys[3], 5L),
new KeyValue<>(keys[4], 2L))
);
- final Set<KeyValue<String, Long>> expectedBatch1 = new
HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
+ final Set<KeyValue<String, Long>> expectedBatch1 =
+ new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -535,7 +575,10 @@ public class QueryableStateIntegrationTest {
mockTime);
final KTable<String, String> t1 = builder.table(streamOne);
- t1.mapValues((ValueMapper<String, Long>) Long::valueOf,
Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+ t1
+ .mapValues(
+ (ValueMapper<String, Long>) Long::valueOf,
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
.toStream()
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
@@ -544,9 +587,8 @@ public class QueryableStateIntegrationTest {
waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
- final ReadOnlyKeyValueStore<String, Long>
- myMapStore = kafkaStreams.store("queryMapValues",
- QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<String, Long> myMapStore =
+ kafkaStreams.store("queryMapValues",
QueryableStoreTypes.keyValueStore());
for (final KeyValue<String, String> batchEntry : batch1) {
assertEquals(Long.valueOf(batchEntry.value),
myMapStore.get(batchEntry.key));
}
@@ -566,7 +608,8 @@ public class QueryableStateIntegrationTest {
new KeyValue<>(keys[3], "5"),
new KeyValue<>(keys[4], "2"))
);
- final Set<KeyValue<String, Long>> expectedBatch1 = new
HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
+ final Set<KeyValue<String, Long>> expectedBatch1 =
+ new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
@@ -581,7 +624,10 @@ public class QueryableStateIntegrationTest {
final Predicate<String, String> filterPredicate = (key, value) ->
key.contains("kafka");
final KTable<String, String> t1 = builder.table(streamOne);
final KTable<String, String> t2 = t1.filter(filterPredicate,
Materialized.as("queryFilter"));
- final KTable<String, Long> t3 = t2.mapValues((ValueMapper<String,
Long>) Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
+ final KTable<String, Long> t3 = t2
+ .mapValues(
+ (ValueMapper<String, Long>) Long::valueOf,
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
t3.toStream().to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -596,7 +642,8 @@ public class QueryableStateIntegrationTest {
assertEquals(myMapStore.get(expectedEntry.key),
expectedEntry.value);
}
for (final KeyValue<String, String> batchEntry : batch1) {
- final KeyValue<String, Long> batchEntryMapValue = new
KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+ final KeyValue<String, Long> batchEntryMapValue =
+ new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
if (!expectedBatch1.contains(batchEntryMapValue)) {
assertNull(myMapStore.get(batchEntry.key));
}
@@ -685,11 +732,19 @@ public class QueryableStateIntegrationTest {
mockTime);
final int maxWaitMs = 30000;
- TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs,
"waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, Long> store =
kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+ TestUtils.waitForCondition(
+ new WaitForStore(storeName),
+ maxWaitMs,
+ "waiting for store " + storeName);
- TestUtils.waitForCondition(() -> new
Long(8).equals(store.get("hello")), maxWaitMs, "wait for count to be 8");
+ final ReadOnlyKeyValueStore<String, Long> store =
+ kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+
+ TestUtils.waitForCondition(
+ () -> new Long(8).equals(store.get("hello")),
+ maxWaitMs,
+ "wait for count to be 8");
// close stream
kafkaStreams.close();
@@ -699,14 +754,19 @@ public class QueryableStateIntegrationTest {
kafkaStreams.start();
// make sure we never get any value other than 8 for hello
- TestUtils.waitForCondition(() -> {
- try {
- assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
- return true;
- } catch (final InvalidStateStoreException ise) {
- return false;
- }
- }, maxWaitMs, "waiting for store " + storeName);
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ assertEquals(
+ Long.valueOf(8L),
+ kafkaStreams.store(storeName,
QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
+ return true;
+ } catch (final InvalidStateStoreException ise) {
+ return false;
+ }
+ },
+ maxWaitMs,
+ "waiting for store " + storeName);
}
@@ -756,7 +816,11 @@ public class QueryableStateIntegrationTest {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOne,
- Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"),
KeyValue.pair("b", "3"), KeyValue.pair("b", "4")),
+ Arrays.asList(
+ KeyValue.pair("a", "1"),
+ KeyValue.pair("a", "2"),
+ KeyValue.pair("b", "3"),
+ KeyValue.pair("b", "4")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
@@ -765,9 +829,14 @@ public class QueryableStateIntegrationTest {
mockTime);
final int maxWaitMs = 30000;
- TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs,
"waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, String> store =
kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+ TestUtils.waitForCondition(
+ new WaitForStore(storeName),
+ maxWaitMs,
+ "waiting for store " + storeName);
+
+ final ReadOnlyKeyValueStore<String, String> store =
+ kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
TestUtils.waitForCondition(
() -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
@@ -784,21 +853,29 @@ public class QueryableStateIntegrationTest {
new Properties()),
mockTime);
- TestUtils.waitForCondition(failed::get, 30000, "wait for thread to
fail");
-
- TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs,
"waiting for store " + storeName);
+ TestUtils.waitForCondition(
+ failed::get,
+ maxWaitMs,
+ "wait for thread to fail");
+ TestUtils.waitForCondition(
+ new WaitForStore(storeName),
+ maxWaitMs,
+ "waiting for store " + storeName);
- final ReadOnlyKeyValueStore<String, String> store2 =
kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<String, String> store2 =
+ kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
try {
- TestUtils.waitForCondition(() ->
- ("125".equals(store2.get("a"))
- || "1225".equals(store2.get("a"))
- || "12125".equals(store2.get("a")))
- &&
- ("34".equals(store2.get("b"))
- || "344".equals(store2.get("b"))
- || "3434".equals(store2.get("b"))), maxWaitMs, "wait for agg
to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
+ TestUtils.waitForCondition(
+ () -> ("125".equals(store2.get("a"))
+ || "1225".equals(store2.get("a"))
+ || "12125".equals(store2.get("a")))
+ &&
+ ("34".equals(store2.get("b"))
+ || "344".equals(store2.get("b"))
+ || "3434".equals(store2.get("b"))),
+ maxWaitMs,
+ "wait for agg to be <a,125>||<a,1225>||<a,12125> and
<b,34>||<b,344>||<b,3434>");
} catch (final Throwable t) {
throw new RuntimeException("Store content is a: " +
store2.get("a") + "; b: " + store2.get("b"), t);
}
@@ -914,7 +991,8 @@ public class QueryableStateIntegrationTest {
}
- private void waitUntilAtLeastNumRecordProcessed(final String topic, final
int numRecs) throws Exception {
+ private void waitUntilAtLeastNumRecordProcessed(final String topic,
+ final int numRecs) throws
Exception {
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"queryable-state-consumer");
@@ -930,7 +1008,8 @@ public class QueryableStateIntegrationTest {
private Set<KeyValue<String, Long>> fetch(final
ReadOnlyWindowStore<String, Long> store,
final String key) {
- final WindowStoreIterator<Long> fetch = store.fetch(key,
ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
+ final WindowStoreIterator<Long> fetch =
+ store.fetch(key, ofEpochMilli(0),
ofEpochMilli(System.currentTimeMillis()));
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singleton(KeyValue.pair(key, next.value));
@@ -940,7 +1019,8 @@ public class QueryableStateIntegrationTest {
private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long>
store,
final String key) {
- final WindowStoreIterator<Long> fetch = store.fetch(key,
ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
+ final WindowStoreIterator<Long> fetch =
+ store.fetch(key, ofEpochMilli(0),
ofEpochMilli(System.currentTimeMillis()));
if (fetch.hasNext()) {
final KeyValue<Long, Long> next = fetch.next();
return Collections.singletonMap(key, next.value);
@@ -958,7 +1038,9 @@ public class QueryableStateIntegrationTest {
private int currIteration = 0;
boolean shutdown = false;
- ProducerRunnable(final String topic, final List<String> inputValues,
final int numIterations) {
+ ProducerRunnable(final String topic,
+ final List<String> inputValues,
+ final int numIterations) {
this.topic = topic;
this.inputValues = inputValues;
this.numIterations = numIterations;