This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 142fade MINOR: Fix Streams EOS tests (#5612)
142fade is described below
commit 142fadeca2ce8ea1dcb1471db53c39a894ab71cb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Sep 5 10:50:22 2018 -0700
MINOR: Fix Streams EOS tests (#5612)
Back porting #5501 broke some tests.
Reviewers: John Roesler <[email protected]>, Guozhang Wang
<[email protected]>
---
.../apache/kafka/streams/processor/internals/StreamTaskTest.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1406ee4..ad7ea11 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1269,11 +1269,12 @@ public class StreamTaskTest {
@Test
public void shouldAlwaysCommitIfEosEnabled() {
+ task = createStatelessTask(true);
+
final RecordCollectorImpl recordCollector = new
RecordCollectorImpl("StreamTask",
new LogContext("StreamTaskTest "), new
DefaultProductionExceptionHandler());
recordCollector.init(producer);
- task = createStatelessTask(true);
task.initializeStateStores();
task.initializeTopology();
task.punctuate(processorSystemTime, 5,
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -1355,7 +1356,7 @@ public class StreamTaskTest {
}
// this task will throw exception when processing (on partition2),
flushing, suspending and closing
- private StreamTask createTaskThatThrowsException(final boolean enableEos) {
+ private StreamTask createTaskThatThrowsException(final boolean eosEnabled)
{
final ProcessorTopology topology = ProcessorTopology.withSources(
Utils.<ProcessorNode>mkList(source1, source3,
processorStreamTime, processorSystemTime),
new HashMap<String, SourceNode>() {
@@ -1371,7 +1372,7 @@ public class StreamTaskTest {
source1.addChild(processorSystemTime);
source3.addChild(processorSystemTime);
- return new StreamTask(taskId00, partitions, topology, consumer,
changelogReader, config,
+ return new StreamTask(taskId00, partitions, topology, consumer,
changelogReader, eosEnabled ? eosConfig : config,
streamsMetrics, stateDirectory, null, time, new
StreamTask.ProducerSupplier() {
@Override
public Producer<byte[], byte[]> get() {