This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 99582b998b4c8251ed0d1469c2d3f361cdd5b8bf
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Aug 16 13:58:42 2018 +0200

    [FLINK-10159][tests] Fail TestHarness.initializeState if harness has 
already been initialized
    
    This is a change in tests only. Previously it was technically possible to 
call first `harness.open()`
    followed by `harness.initializeState(fooBar)`. However this was incorrect, 
since `open()` was already
    calling `initializeState(null)`, which was leading to quirks. This commit 
adds a `checkState` which
    makes sure that `initializeState` is called only once.
---
 .../kafka/FlinkKafkaProducer011ITCase.java         |  2 -
 .../api/checkpoint/ListCheckpointedTest.java       | 30 +++++++++---
 .../sink/TwoPhaseCommitSinkFunctionTest.java       | 15 +++++-
 .../util/AbstractStreamOperatorTestHarness.java    |  3 ++
 .../AbstractStreamOperatorTestHarnessTest.java     | 55 ++++++++++++++++++++++
 5 files changed, 95 insertions(+), 10 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 74c58ad..57b7e77 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -172,7 +172,6 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
 
                testHarness.setup();
                testHarness.open();
-               testHarness.initializeState(null);
                testHarness.processElement(42, 0);
                testHarness.snapshot(0, 1);
                testHarness.processElement(43, 2);
@@ -225,7 +224,6 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
 
                testHarness1.setup();
                testHarness1.open();
-               testHarness1.initializeState(null);
                testHarness1.processElement(42, 0);
                testHarness1.snapshot(0, 1);
                testHarness1.processElement(43, 2);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
index d6d7591..644ab04 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java
@@ -30,6 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for {@link ListCheckpointed}.
  */
@@ -51,12 +54,25 @@ public class ListCheckpointedTest {
        }
 
        private static void testUDF(TestUserFunction userFunction) throws 
Exception {
-               AbstractStreamOperatorTestHarness<Integer> testHarness =
-                       new AbstractStreamOperatorTestHarness<>(new 
StreamMap<>(userFunction), 1, 1, 0);
-               testHarness.open();
-               OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.initializeState(snapshot);
-               Assert.assertTrue(userFunction.isRestored());
+               OperatorSubtaskState snapshot;
+               try (AbstractStreamOperatorTestHarness<Integer> testHarness = 
createTestHarness(userFunction)) {
+                       testHarness.open();
+                       snapshot = testHarness.snapshot(0L, 0L);
+                       assertFalse(userFunction.isRestored());
+               }
+               try (AbstractStreamOperatorTestHarness<Integer> testHarness = 
createTestHarness(userFunction)) {
+                       testHarness.initializeState(snapshot);
+                       testHarness.open();
+                       assertTrue(userFunction.isRestored());
+               }
+       }
+
+       private static AbstractStreamOperatorTestHarness<Integer> 
createTestHarness(TestUserFunction userFunction) throws Exception {
+               return new AbstractStreamOperatorTestHarness<>(
+                       new StreamMap<>(userFunction),
+                       1,
+                       1,
+                       0);
        }
 
        private static class TestUserFunction extends RichMapFunction<Integer, 
Integer> implements ListCheckpointed<Integer> {
@@ -86,7 +102,7 @@ public class ListCheckpointedTest {
                        if (null != expected) {
                                Assert.assertEquals(expected, state);
                        } else {
-                               Assert.assertTrue(state.isEmpty());
+                               assertTrue(state.isEmpty());
                        }
                        restored = true;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 166dc5a..2970b87 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -195,10 +195,14 @@ public class TwoPhaseCommitSinkFunctionTest {
                final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
                harness.notifyOfCompletedCheckpoint(1);
 
+               throwException.set(true);
+
+               closeTestHarness();
+               setUpTestHarness();
+
                final long transactionTimeout = 1000;
                sinkFunction.setTransactionTimeout(transactionTimeout);
                sinkFunction.ignoreFailuresAfterTransactionTimeout();
-               throwException.set(true);
 
                try {
                        harness.initializeState(snapshot);
@@ -251,11 +255,20 @@ public class TwoPhaseCommitSinkFunctionTest {
                final OperatorSubtaskState snapshot = harness.snapshot(0, 1);
                final long elapsedTime = (long) ((double) transactionTimeout * 
warningRatio + 2);
                clock.setEpochMilli(elapsedTime);
+
+               closeTestHarness();
+               setUpTestHarness();
+               sinkFunction.setTransactionTimeout(transactionTimeout);
+               sinkFunction.enableTransactionTimeoutWarnings(warningRatio);
+
                harness.initializeState(snapshot);
+               harness.open();
 
                final List<String> logMessages =
                        
loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
 
+               closeTestHarness();
+
                assertThat(
                        logMessages,
                        hasItem(containsString("has been open for 502 ms. " +
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2f19ce2..8ee8c03 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -80,6 +80,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -359,6 +360,8 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
                OperatorSubtaskState jmOperatorStateHandles,
                OperatorSubtaskState tmOperatorStateHandles) throws Exception {
 
+               checkState(!initializeCalled, "TestHarness has already been 
initialized. Have you " +
+                       "opened this harness before initializing it?");
                if (!setupCalled) {
                        setup();
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
new file mode 100644
index 0000000..65f62a3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+/**
+ * Tests for {@link AbstractStreamOperatorTestHarness}.
+ */
+public class AbstractStreamOperatorTestHarnessTest extends TestLogger {
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       @Test
+       public void testInitializeAfterOpenning() throws Throwable {
+               expectedException.expect(IllegalStateException.class);
+               expectedException.expectMessage(containsString("TestHarness has 
already been initialized."));
+
+               AbstractStreamOperatorTestHarness<Integer> result;
+               result =
+                       new AbstractStreamOperatorTestHarness<>(
+                               new AbstractStreamOperator<Integer>() {
+                               },
+                               1,
+                               1,
+                               0);
+               result.setup();
+               result.open();
+               result.initializeState(new OperatorSubtaskState());
+       }
+}

Reply via email to