This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1be08c1401f5e9a94e3ee6b861d1d1e168b519a1
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sun Oct 25 18:04:19 2020 +0800

    [hotfix] Replace finally block with JUnit After method in 
SourceOperatorTest.
---
 .../api/operators/SourceOperatorTest.java          | 83 +++++++++-------------
 1 file changed, 34 insertions(+), 49 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 8fbcf19..3b381fd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.util.CollectionUtil;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -74,6 +75,11 @@ public class SourceOperatorTest {
                this.operator = new TestingSourceOperator<>(mockSourceReader, 
mockGateway, SUBTASK_INDEX);
        }
 
+       @After
+       public void cleanUp() throws Exception {
+               operator.close();
+       }
+
        @Test
        public void testInitializeState() throws Exception {
                StateInitializationContext stateContext = getStateContext();
@@ -88,22 +94,16 @@ public class SourceOperatorTest {
                operator.initializeState(getStateContext());
                // Open the operator.
                operator.open();
-               try {
-                       // The source reader should have been assigned a split.
-                       assertEquals(Collections.singletonList(MOCK_SPLIT), 
mockSourceReader.getAssignedSplits());
-                       // The source reader should have started.
-                       assertTrue(mockSourceReader.isStarted());
-
-                       // A ReaderRegistrationRequest should have been sent.
-                       assertEquals(1, mockGateway.getEventsSent().size());
-                       OperatorEvent operatorEvent = 
mockGateway.getEventsSent().get(0);
-                       assertTrue(operatorEvent instanceof 
ReaderRegistrationEvent);
-                       assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) 
operatorEvent).subtaskId());
-
-               }
-               finally {
-                       operator.close();
-               }
+               // The source reader should have been assigned a split.
+               assertEquals(Collections.singletonList(MOCK_SPLIT), 
mockSourceReader.getAssignedSplits());
+               // The source reader should have started.
+               assertTrue(mockSourceReader.isStarted());
+
+               // A ReaderRegistrationRequest should have been sent.
+               assertEquals(1, mockGateway.getEventsSent().size());
+               OperatorEvent operatorEvent = 
mockGateway.getEventsSent().get(0);
+               assertTrue(operatorEvent instanceof ReaderRegistrationEvent);
+               assertEquals(SUBTASK_INDEX, ((ReaderRegistrationEvent) 
operatorEvent).subtaskId());
                assertTrue(mockSourceReader.isClosed());
        }
 
@@ -111,32 +111,22 @@ public class SourceOperatorTest {
        public void testHandleAddSplitsEvent() throws Exception {
                operator.initializeState(getStateContext());
                operator.open();
-               try {
-                       MockSourceSplit newSplit = new MockSourceSplit((2));
-                       operator.handleOperatorEvent(new AddSplitEvent<>(
-                               Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
-                       // The source reader should have been assigned two 
splits.
-                       assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
mockSourceReader.getAssignedSplits());
-               }
-               finally {
-                       operator.close();
-               }
+               MockSourceSplit newSplit = new MockSourceSplit((2));
+               operator.handleOperatorEvent(new AddSplitEvent<>(
+                       Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
+               // The source reader should have been assigned two splits.
+               assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
mockSourceReader.getAssignedSplits());
        }
 
        @Test
        public void testHandleAddSourceEvent() throws Exception {
                operator.initializeState(getStateContext());
                operator.open();
-               try {
-                       SourceEvent event = new SourceEvent() {
-                       };
-                       operator.handleOperatorEvent(new 
SourceEventWrapper(event));
-                       // The source reader should have been assigned two 
splits.
-                       assertEquals(Collections.singletonList(event), 
mockSourceReader.getReceivedSourceEvents());
-               }
-               finally {
-                       operator.close();
-               }
+               SourceEvent event = new SourceEvent() {
+               };
+               operator.handleOperatorEvent(new SourceEventWrapper(event));
+               // The source reader should have been assigned two splits.
+               assertEquals(Collections.singletonList(event), 
mockSourceReader.getReceivedSourceEvents());
        }
 
        @Test
@@ -155,19 +145,14 @@ public class SourceOperatorTest {
                StateInitializationContext stateContext = getStateContext();
                operator.initializeState(stateContext);
                operator.open();
-               try {
-                       MockSourceSplit newSplit = new MockSourceSplit((2));
-                       operator.handleOperatorEvent(new AddSplitEvent<>(
-                               Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
-                       operator.snapshotState(new 
StateSnapshotContextSynchronousImpl(100L, 100L));
-
-                       // Verify the splits in state.
-                       List<MockSourceSplit> splitsInState = 
CollectionUtil.iterableToList(operator.getReaderState().get());
-                       assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
splitsInState);
-               }
-               finally {
-                       operator.close();
-               }
+               MockSourceSplit newSplit = new MockSourceSplit((2));
+               operator.handleOperatorEvent(new AddSplitEvent<>(
+                       Collections.singletonList(newSplit), new 
MockSourceSplitSerializer()));
+               operator.snapshotState(new 
StateSnapshotContextSynchronousImpl(100L, 100L));
+
+               // Verify the splits in state.
+               List<MockSourceSplit> splitsInState = 
CollectionUtil.iterableToList(operator.getReaderState().get());
+               assertEquals(Arrays.asList(MOCK_SPLIT, newSplit), 
splitsInState);
        }
 
        // ---------------- helper methods -------------------------

Reply via email to