This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 142fade  MINOR: Fix Streams EOS tests (#5612)
142fade is described below

commit 142fadeca2ce8ea1dcb1471db53c39a894ab71cb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Sep 5 10:50:22 2018 -0700

    MINOR: Fix Streams EOS tests (#5612)
    
    Back porting #5501 broke some tests.
    
    Reviewers: John Roesler <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../apache/kafka/streams/processor/internals/StreamTaskTest.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1406ee4..ad7ea11 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1269,11 +1269,12 @@ public class StreamTaskTest {
 
     @Test
     public void shouldAlwaysCommitIfEosEnabled() {
+        task = createStatelessTask(true);
+
         final RecordCollectorImpl recordCollector =  new 
RecordCollectorImpl("StreamTask",
                 new LogContext("StreamTaskTest "), new 
DefaultProductionExceptionHandler());
         recordCollector.init(producer);
 
-        task = createStatelessTask(true);
         task.initializeStateStores();
         task.initializeTopology();
         task.punctuate(processorSystemTime, 5, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -1355,7 +1356,7 @@ public class StreamTaskTest {
     }
 
     // this task will throw exception when processing (on partition2), 
flushing, suspending and closing
-    private StreamTask createTaskThatThrowsException(final boolean enableEos) {
+    private StreamTask createTaskThatThrowsException(final boolean eosEnabled) 
{
         final ProcessorTopology topology = ProcessorTopology.withSources(
                 Utils.<ProcessorNode>mkList(source1, source3, 
processorStreamTime, processorSystemTime),
                 new HashMap<String, SourceNode>() {
@@ -1371,7 +1372,7 @@ public class StreamTaskTest {
         source1.addChild(processorSystemTime);
         source3.addChild(processorSystemTime);
 
-        return new StreamTask(taskId00, partitions, topology, consumer, 
changelogReader, config,
+        return new StreamTask(taskId00, partitions, topology, consumer, 
changelogReader, eosEnabled ? eosConfig : config,
             streamsMetrics, stateDirectory, null, time, new 
StreamTask.ProducerSupplier() {
                 @Override
                 public Producer<byte[], byte[]> get() {

Reply via email to