This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1be08c1401f5e9a94e3ee6b861d1d1e168b519a1 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sun Oct 25 18:04:19 2020 +0800 [hotfix] Replace finally block with JUnit After method in SourceOperatorTest. --- .../api/operators/SourceOperatorTest.java | 83 +++++++++------------- 1 file changed, 34 insertions(+), 49 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 8fbcf19..3b381fd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.util.CollectionUtil; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -74,6 +75,11 @@ public class SourceOperatorTest { this.operator = new TestingSourceOperator<>(mockSourceReader, mockGateway, SUBTASK_INDEX); } + @After + public void cleanUp() throws Exception { + operator.close(); + } + @Test public void testInitializeState() throws Exception { StateInitializationContext stateContext = getStateContext(); @@ -88,22 +94,16 @@ public class SourceOperatorTest { operator.initializeState(getStateContext()); // Open the operator. operator.open(); - try { - // The source reader should have been assigned a split. - assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits()); - // The source reader should have started. - assertTrue(mockSourceReader.isStarted()); - - // A ReaderRegistrationRequest should have been sent. - assertEquals(1, mockGateway.getEventsSent().size()); - OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0); - assertTrue(operatorEvent instanceof ReaderRegistrationEvent); - assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId()); - - } - finally { - operator.close(); - } + // The source reader should have been assigned a split. + assertEquals(Collections.singletonList(MOCK_SPLIT), mockSourceReader.getAssignedSplits()); + // The source reader should have started. + assertTrue(mockSourceReader.isStarted()); + + // A ReaderRegistrationRequest should have been sent. + assertEquals(1, mockGateway.getEventsSent().size()); + OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0); + assertTrue(operatorEvent instanceof ReaderRegistrationEvent); + assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) operatorEvent).subtaskId()); assertTrue(mockSourceReader.isClosed()); } @@ -111,32 +111,22 @@ public class SourceOperatorTest { public void testHandleAddSplitsEvent() throws Exception { operator.initializeState(getStateContext()); operator.open(); - try { - MockSourceSplit newSplit = new MockSourceSplit((2)); - operator.handleOperatorEvent(new AddSplitEvent<>( - Collections.singletonList(newSplit), new MockSourceSplitSerializer())); - // The source reader should have been assigned two splits. - assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits()); - } - finally { - operator.close(); - } + MockSourceSplit newSplit = new MockSourceSplit((2)); + operator.handleOperatorEvent(new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + // The source reader should have been assigned two splits. + assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), mockSourceReader.getAssignedSplits()); } @Test public void testHandleAddSourceEvent() throws Exception { operator.initializeState(getStateContext()); operator.open(); - try { - SourceEvent event = new SourceEvent() { - }; - operator.handleOperatorEvent(new SourceEventWrapper(event)); - // The source reader should have been assigned two splits. - assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents()); - } - finally { - operator.close(); - } + SourceEvent event = new SourceEvent() { + }; + operator.handleOperatorEvent(new SourceEventWrapper(event)); + // The source reader should have been assigned two splits. + assertEquals(Collections.singletonList(event), mockSourceReader.getReceivedSourceEvents()); } @Test @@ -155,19 +145,14 @@ public class SourceOperatorTest { StateInitializationContext stateContext = getStateContext(); operator.initializeState(stateContext); operator.open(); - try { - MockSourceSplit newSplit = new MockSourceSplit((2)); - operator.handleOperatorEvent(new AddSplitEvent<>( - Collections.singletonList(newSplit), new MockSourceSplitSerializer())); - operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); - - // Verify the splits in state. - List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get()); - assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState); - } - finally { - operator.close(); - } + MockSourceSplit newSplit = new MockSourceSplit((2)); + operator.handleOperatorEvent(new AddSplitEvent<>( + Collections.singletonList(newSplit), new MockSourceSplitSerializer())); + operator.snapshotState(new StateSnapshotContextSynchronousImpl(100L, 100L)); + + // Verify the splits in state. + List<MockSourceSplit> splitsInState = CollectionUtil.iterableToList(operator.getReaderState().get()); + assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), splitsInState); } // ---------------- helper methods -------------------------
