This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 99582b998b4c8251ed0d1469c2d3f361cdd5b8bf Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Aug 16 13:58:42 2018 +0200 [FLINK-10159][tests] Fail TestHarness.initializeState if harness has already been initialized This is a change in tests only. Previously it was technically possible to call first `harness.open()` followed by `harness.initializeState(fooBar)`. However this was incorrect, since `open()` was already calling `initializeState(null)`, which was leading to quirks. This commit adds a `checkState` which makes sure that `initializeState` is called only once. --- .../kafka/FlinkKafkaProducer011ITCase.java | 2 - .../api/checkpoint/ListCheckpointedTest.java | 30 +++++++++--- .../sink/TwoPhaseCommitSinkFunctionTest.java | 15 +++++- .../util/AbstractStreamOperatorTestHarness.java | 3 ++ .../AbstractStreamOperatorTestHarnessTest.java | 55 ++++++++++++++++++++++ 5 files changed, 95 insertions(+), 10 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 74c58ad..57b7e77 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -172,7 +172,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.setup(); testHarness.open(); - testHarness.initializeState(null); testHarness.processElement(42, 0); testHarness.snapshot(0, 1); testHarness.processElement(43, 2); @@ -225,7 +224,6 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness1.setup(); testHarness1.open(); - testHarness1.initializeState(null); testHarness1.processElement(42, 0); testHarness1.snapshot(0, 1); testHarness1.processElement(43, 2); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index d6d7591..644ab04 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -30,6 +30,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Tests for {@link ListCheckpointed}. */ @@ -51,12 +54,25 @@ public class ListCheckpointedTest { } private static void testUDF(TestUserFunction userFunction) throws Exception { - AbstractStreamOperatorTestHarness<Integer> testHarness = - new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); - testHarness.open(); - OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); - testHarness.initializeState(snapshot); - Assert.assertTrue(userFunction.isRestored()); + OperatorSubtaskState snapshot; + try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) { + testHarness.open(); + snapshot = testHarness.snapshot(0L, 0L); + assertFalse(userFunction.isRestored()); + } + try (AbstractStreamOperatorTestHarness<Integer> testHarness = createTestHarness(userFunction)) { + testHarness.initializeState(snapshot); + testHarness.open(); + assertTrue(userFunction.isRestored()); + } + } + + private static AbstractStreamOperatorTestHarness<Integer> createTestHarness(TestUserFunction userFunction) throws Exception { + return new AbstractStreamOperatorTestHarness<>( + new StreamMap<>(userFunction), + 1, + 1, + 0); } private static class TestUserFunction extends RichMapFunction<Integer, Integer> implements ListCheckpointed<Integer> { @@ -86,7 +102,7 @@ public class ListCheckpointedTest { if (null != expected) { Assert.assertEquals(expected, state); } else { - Assert.assertTrue(state.isEmpty()); + assertTrue(state.isEmpty()); } restored = true; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 166dc5a..2970b87 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -195,10 +195,14 @@ public class TwoPhaseCommitSinkFunctionTest { final OperatorSubtaskState snapshot = harness.snapshot(0, 1); harness.notifyOfCompletedCheckpoint(1); + throwException.set(true); + + closeTestHarness(); + setUpTestHarness(); + final long transactionTimeout = 1000; sinkFunction.setTransactionTimeout(transactionTimeout); sinkFunction.ignoreFailuresAfterTransactionTimeout(); - throwException.set(true); try { harness.initializeState(snapshot); @@ -251,11 +255,20 @@ public class TwoPhaseCommitSinkFunctionTest { final OperatorSubtaskState snapshot = harness.snapshot(0, 1); final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2); clock.setEpochMilli(elapsedTime); + + closeTestHarness(); + setUpTestHarness(); + sinkFunction.setTransactionTimeout(transactionTimeout); + sinkFunction.enableTransactionTimeoutWarnings(warningRatio); + harness.initializeState(snapshot); + harness.open(); final List<String> logMessages = loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList()); + closeTestHarness(); + assertThat( logMessages, hasItem(containsString("has been open for 502 ms. " + diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 2f19ce2..8ee8c03 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -80,6 +80,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.flink.util.Preconditions.checkState; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -359,6 +360,8 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { OperatorSubtaskState jmOperatorStateHandles, OperatorSubtaskState tmOperatorStateHandles) throws Exception { + checkState(!initializeCalled, "TestHarness has already been initialized. Have you " + + "opened this harness before initializing it?"); if (!setupCalled) { setup(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java new file mode 100644 index 0000000..65f62a3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.hamcrest.CoreMatchers.containsString; + +/** + * Tests for {@link AbstractStreamOperatorTestHarness}. + */ +public class AbstractStreamOperatorTestHarnessTest extends TestLogger { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testInitializeAfterOpenning() throws Throwable { + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage(containsString("TestHarness has already been initialized.")); + + AbstractStreamOperatorTestHarness<Integer> result; + result = + new AbstractStreamOperatorTestHarness<>( + new AbstractStreamOperator<Integer>() { + }, + 1, + 1, + 0); + result.setup(); + result.open(); + result.initializeState(new OperatorSubtaskState()); + } +}
