This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7812b4af7fa0e1c324104b05629d139dac1912d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Apr 1 11:38:31 2022 +0200

    [FLINK-26995][tests] Migrate tests to ITCases
    
    These classes were only renamed:
    - FileUploadHandlerTest requires exclusive usage of DeleteOnExitHooks
    - StreamTaskTimerTest requires exclusive access to to 
StreamTask#TRIGGER_THREAD_GROUP
    - SinkTransformationTranslatorTest fails it
            a) relies on initially starting out with a clean state in the 
streaming graph generation
            b) relies on test cases permanently mutating the state in the 
streaming graph generation
    
    Some tests of the following classes were moved into a separate ITCase:
    - MemoryExecutionGraphInfoStoreTest relies on static 
SignallingBlockingNoOpInvokable
    - StreamTaskTest requires exclusive usage of the OutputFlusher
---
 .../MemoryExecutionGraphInfoStoreITCase.java       |  67 +++++++
 .../MemoryExecutionGraphInfoStoreTest.java         |  30 ---
 ...ndlerTest.java => FileUploadHandlerITCase.java} |   2 +-
 ...ava => SinkTransformationTranslatorITCase.java} |  10 +-
 ...skTimerTest.java => StreamTaskTimerITCase.java} |   2 +-
 .../operators/TestProcessingTimeServiceTest.java   |   2 +-
 .../runtime/tasks/LocalStateForwardingTest.java    |   2 +-
 .../streaming/runtime/tasks/StreamTaskITCase.java  | 213 +++++++++++++++++++++
 .../streaming/runtime/tasks/StreamTaskTest.java    | 182 +-----------------
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   2 +-
 .../runtime/tasks/SynchronousCheckpointTest.java   |   2 +-
 .../JobMasterStopWithSavepointITCase.java          |   2 +-
 pom.xml                                            |   3 +-
 13 files changed, 297 insertions(+), 222 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java
new file mode 100644
index 00000000000..affa04550c5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Tests for the {@link MemoryExecutionGraphInfoStore}. */
+public class MemoryExecutionGraphInfoStoreITCase extends TestLogger {
+
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+
+    /** Tests that a session cluster can terminate gracefully when jobs are 
still running. */
+    @Test
+    public void testPutSuspendedJobOnClusterShutdown() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.JOB_STORE_TYPE, 
JobManagerOptions.JobStoreType.Memory);
+        try (final MiniCluster miniCluster =
+                new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster(
+                        new MiniClusterConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .build(),
+                        new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
+            miniCluster.start();
+            final JobVertex vertex = new JobVertex("blockingVertex");
+            // The adaptive scheduler expects that every vertex has a 
configured parallelism
+            vertex.setParallelism(1);
+            vertex.setInvokableClass(
+                    
ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class);
+            final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(vertex);
+            miniCluster.submitJob(jobGraph);
+            
ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
index c1cb28833c0..350f39dc601 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java
@@ -21,16 +21,9 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.util.ManualTicker;
@@ -240,29 +233,6 @@ public class MemoryExecutionGraphInfoStoreTest extends 
TestLogger {
         }
     }
 
-    /** Tests that a session cluster can terminate gracefully when jobs are 
still running. */
-    @Test
-    public void testPutSuspendedJobOnClusterShutdown() throws Exception {
-        Configuration configuration = new Configuration();
-        configuration.set(JobManagerOptions.JOB_STORE_TYPE, 
JobManagerOptions.JobStoreType.Memory);
-        try (final MiniCluster miniCluster =
-                new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster(
-                        new MiniClusterConfiguration.Builder()
-                                .setConfiguration(configuration)
-                                .build(),
-                        new 
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) {
-            miniCluster.start();
-            final JobVertex vertex = new JobVertex("blockingVertex");
-            // The adaptive scheduler expects that every vertex has a 
configured parallelism
-            vertex.setParallelism(1);
-            vertex.setInvokableClass(
-                    
ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class);
-            final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(vertex);
-            miniCluster.submitJob(jobGraph);
-            
ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await();
-        }
-    }
-
     private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws 
IOException {
         final ExecutionGraphInfo dummyExecutionGraphInfo =
                 new ExecutionGraphInfo(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
similarity index 99%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
index 76170f652ad..632f57ce878 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
@@ -65,7 +65,7 @@ import static org.junit.Assert.fail;
  * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files
  * and/or json are properly handled.
  */
-public class FileUploadHandlerTest extends TestLogger {
+public class FileUploadHandlerITCase extends TestLogger {
 
     @Rule
     public final MultipartUploadResource multipartUpdateResource = new 
MultipartUploadResource();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
similarity index 98%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
index e2086ec63ac..4d0b2323c78 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java
@@ -50,9 +50,13 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
-/** Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}. */
+/**
+ * Tests for {@link 
org.apache.flink.streaming.api.transformations.SinkTransformation}.
+ *
+ * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or 
re-order test cases.
+ */
 @RunWith(Parameterized.class)
-public class SinkTransformationTranslatorTest extends TestLogger {
+public class SinkTransformationTranslatorITCase extends TestLogger {
 
     @Parameterized.Parameters(name = "Execution Mode: {0}")
     public static Collection<Object> data() {
@@ -362,7 +366,7 @@ public class SinkTransformationTranslatorTest extends 
TestLogger {
     private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
         dataStreamSink.name(NAME);
         dataStreamSink.uid(UID);
-        
dataStreamSink.setParallelism(SinkTransformationTranslatorTest.PARALLELISM);
+        
dataStreamSink.setParallelism(SinkTransformationTranslatorITCase.PARALLELISM);
         dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java
similarity index 99%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java
index e03ee80776c..81c5b60f4a0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java
@@ -53,7 +53,7 @@ import static org.junit.Assert.assertThat;
 
 /** Tests for the timer service of {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}. */
 @SuppressWarnings("serial")
-public class StreamTaskTimerTest extends TestLogger {
+public class StreamTaskTimerITCase extends TestLogger {
 
     private StreamTaskTestHarness<?> testHarness;
     private ProcessingTimeService timeService;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index dd8a746e139..d9d0a5c8ec7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -49,7 +49,7 @@ public class TestProcessingTimeServiceTest {
         StreamConfig streamConfig = testHarness.getStreamConfig();
 
         StreamMap<String, String> mapOperator =
-                new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<>());
+                new StreamMap<>(new 
StreamTaskTimerITCase.DummyMapFunction<>());
         streamConfig.setStreamOperator(mapOperator);
         streamConfig.setOperatorID(new OperatorID());
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index dbead9e7f89..2b36239b166 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -98,7 +98,7 @@ public class LocalStateForwardingTest extends TestLogger {
                         0,
                         taskStateManager);
 
-        StreamTask testStreamTask = new 
StreamTaskTest.NoOpStreamTask(streamMockEnvironment);
+        StreamTask testStreamTask = new 
StreamTaskITCase.NoOpStreamTask(streamMockEnvironment);
         CheckpointMetaData checkpointMetaData = new CheckpointMetaData(0L, 0L);
         CheckpointMetricsBuilder checkpointMetrics = new 
CheckpointMetricsBuilder();
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java
new file mode 100644
index 00000000000..3423fa341a8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
+import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
+import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD;
+import static 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link StreamTask}. */
+public class StreamTaskITCase extends TestLogger {
+    @ClassRule
+    public static final TestExecutorResource<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorResource();
+
+    @Test
+    public void testRecordWriterClosedOnTransitDeployingStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitInitializingStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitRunningStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
+    }
+
+    private void testRecordWriterClosedOnTransitStateError(ExecutionState 
executionState)
+            throws Exception {
+        // Throw the exception when the state updating to the expected one.
+        NoOpTaskManagerActions taskManagerActions =
+                new NoOpTaskManagerActions() {
+                    @Override
+                    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+                        if (taskExecutionState.getExecutionState() == 
executionState) {
+                            throw new ExpectedTestException();
+                        }
+                    }
+                };
+
+        testRecordWriterClosedOnError(
+                env ->
+                        taskBuilderWithConfiguredRecordWriter(env)
+                                .setTaskManagerActions(taskManagerActions)
+                                .build(EXECUTOR_RESOURCE.getExecutor()));
+    }
+
+    @Test
+    public void testFailInEndOfConstructor() throws Exception {
+        Configuration conf = new Configuration();
+        // Set the wrong setting type for forcing the fail during read.
+        conf.setString(BUFFER_DEBLOAT_PERIOD.key(), "a");
+        testRecordWriterClosedOnError(
+                env ->
+                        taskBuilderWithConfiguredRecordWriter(env)
+                                .setTaskManagerConfig(conf)
+                                .build(EXECUTOR_RESOURCE.getExecutor()));
+    }
+
+    private void testRecordWriterClosedOnError(
+            FunctionWithException<NettyShuffleEnvironment, Task, Exception> 
taskProvider)
+            throws Exception {
+        try (NettyShuffleEnvironment shuffleEnvironment =
+                new NettyShuffleEnvironmentBuilder().build()) {
+            Task task = taskProvider.apply(shuffleEnvironment);
+
+            task.startTaskThread();
+            task.getExecutingThread().join();
+
+            assertEquals(ExecutionState.FAILED, task.getExecutionState());
+            for (Thread thread : Thread.getAllStackTraces().keySet()) {
+                assertThat(
+                        thread.getName(),
+                        
CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME))));
+            }
+        }
+    }
+
+    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(
+            NettyShuffleEnvironment shuffleEnvironment) {
+        Configuration taskConfiguration = new Configuration();
+        outputEdgeConfiguration(taskConfiguration);
+
+        ResultPartitionDeploymentDescriptor descriptor =
+                new ResultPartitionDeploymentDescriptor(
+                        PartitionDescriptorBuilder.newBuilder().build(),
+                        
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+                        1,
+                        false);
+        return new TestTaskBuilder(shuffleEnvironment)
+                .setInvokable(NoOpStreamTask.class)
+                .setTaskConfig(taskConfiguration)
+                .setResultPartitions(singletonList(descriptor));
+    }
+
+    /**
+     * Make sure that there is some output edge in the config so that some 
RecordWriter is created.
+     */
+    private void outputEdgeConfiguration(Configuration taskConfiguration) {
+        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
+        streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
+
+        StreamConfigChainer cfg =
+                new StreamConfigChainer(new OperatorID(42, 42), streamConfig, 
this, 1);
+        // The OutputFlusher thread is started only if the buffer timeout more 
than 0(default value
+        // is 0).
+        cfg.setBufferTimeout(1);
+        cfg.chain(
+                new OperatorID(44, 44),
+                new UnusedOperatorFactory(),
+                StringSerializer.INSTANCE,
+                StringSerializer.INSTANCE,
+                false);
+        cfg.finish();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Test Utilities
+    // ------------------------------------------------------------------------
+
+    /**
+     * Operator that does nothing.
+     *
+     * @param <T>
+     * @param <OP>
+     */
+    public static class NoOpStreamTask<T, OP extends StreamOperator<T>> 
extends StreamTask<T, OP> {
+
+        public NoOpStreamTask(Environment environment) throws Exception {
+            super(environment);
+        }
+
+        @Override
+        protected void init() throws Exception {
+            inputProcessor = new StreamTaskTest.EmptyInputProcessor();
+        }
+
+        @Override
+        protected void cleanUpInternal() throws Exception {}
+    }
+
+    // ------------------------------------------------------------------------
+    // ------------------------------------------------------------------------
+
+    private static class UnusedOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
+        @Override
+        public <T extends StreamOperator<String>> T createStreamOperator(
+                StreamOperatorParameters<String> parameters) {
+            throw new UnsupportedOperationException("This shouldn't be 
called");
+        }
+
+        @Override
+        public void setChainingStrategy(ChainingStrategy strategy) {}
+
+        @Override
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6e51e7a61d7..6a2d576c8a6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTim
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.SavepointFormatType;
@@ -45,7 +44,6 @@ import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -66,7 +64,6 @@ import 
org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
-import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -102,7 +99,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
-import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -110,15 +106,12 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
@@ -134,17 +127,14 @@ import org.apache.flink.util.CloseableIterable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.TestingUncaughtExceptionHandler;
 import org.apache.flink.util.function.BiConsumerWithException;
-import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.SupplierWithException;
 
-import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.ClassRule;
@@ -183,7 +173,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
 import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED;
@@ -193,15 +182,12 @@ import static 
org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_T
 import static 
org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
-import static 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME;
 import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
 import static 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.CoreMatchers.both;
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -1183,110 +1169,6 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    @Test
-    public void testRecordWriterClosedOnTransitDeployingStateError() throws 
Exception {
-        testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
-    }
-
-    @Test
-    public void testRecordWriterClosedOnTransitInitializingStateError() throws 
Exception {
-        testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
-    }
-
-    @Test
-    public void testRecordWriterClosedOnTransitRunningStateError() throws 
Exception {
-        testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
-    }
-
-    private void testRecordWriterClosedOnTransitStateError(ExecutionState 
executionState)
-            throws Exception {
-        // Throw the exception when the state updating to the expected one.
-        NoOpTaskManagerActions taskManagerActions =
-                new NoOpTaskManagerActions() {
-                    @Override
-                    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-                        if (taskExecutionState.getExecutionState() == 
executionState) {
-                            throw new ExpectedTestException();
-                        }
-                    }
-                };
-
-        testRecordWriterClosedOnError(
-                env ->
-                        taskBuilderWithConfiguredRecordWriter(env)
-                                .setTaskManagerActions(taskManagerActions)
-                                .build(EXECUTOR_RESOURCE.getExecutor()));
-    }
-
-    @Test
-    public void testFailInEndOfConstructor() throws Exception {
-        Configuration conf = new Configuration();
-        // Set the wrong setting type for forcing the fail during read.
-        conf.setString(BUFFER_DEBLOAT_PERIOD.key(), "a");
-        testRecordWriterClosedOnError(
-                env ->
-                        taskBuilderWithConfiguredRecordWriter(env)
-                                .setTaskManagerConfig(conf)
-                                .build(EXECUTOR_RESOURCE.getExecutor()));
-    }
-
-    private void testRecordWriterClosedOnError(
-            FunctionWithException<NettyShuffleEnvironment, Task, Exception> 
taskProvider)
-            throws Exception {
-        try (NettyShuffleEnvironment shuffleEnvironment =
-                new NettyShuffleEnvironmentBuilder().build()) {
-            Task task = taskProvider.apply(shuffleEnvironment);
-
-            task.startTaskThread();
-            task.getExecutingThread().join();
-
-            assertEquals(ExecutionState.FAILED, task.getExecutionState());
-            for (Thread thread : Thread.getAllStackTraces().keySet()) {
-                assertThat(
-                        thread.getName(),
-                        
CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME))));
-            }
-        }
-    }
-
-    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(
-            NettyShuffleEnvironment shuffleEnvironment) {
-        Configuration taskConfiguration = new Configuration();
-        outputEdgeConfiguration(taskConfiguration);
-
-        ResultPartitionDeploymentDescriptor descriptor =
-                new ResultPartitionDeploymentDescriptor(
-                        PartitionDescriptorBuilder.newBuilder().build(),
-                        
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
-                        1,
-                        false);
-        return new TestTaskBuilder(shuffleEnvironment)
-                .setInvokable(NoOpStreamTask.class)
-                .setTaskConfig(taskConfiguration)
-                .setResultPartitions(singletonList(descriptor));
-    }
-
-    /**
-     * Make sure that there is some output edge in the config so that some 
RecordWriter is created.
-     */
-    private void outputEdgeConfiguration(Configuration taskConfiguration) {
-        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
-        streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
-
-        StreamConfigChainer cfg =
-                new StreamConfigChainer(new OperatorID(42, 42), streamConfig, 
this, 1);
-        // The OutputFlusher thread is started only if the buffer timeout more 
than 0(default value
-        // is 0).
-        cfg.setBufferTimeout(1);
-        cfg.chain(
-                new OperatorID(44, 44),
-                new UnusedOperatorFactory(),
-                StringSerializer.INSTANCE,
-                StringSerializer.INSTANCE,
-                false);
-        cfg.finish();
-    }
-
     @Test
     public void testProcessWithAvailableOutput() throws Exception {
         try (final MockEnvironment environment = setupEnvironment(true, true)) 
{
@@ -1997,27 +1879,6 @@ public class StreamTaskTest extends TestLogger {
         return new RunningTask<>(taskCreationFuture.get(), invocationFuture);
     }
 
-    /**
-     * Operator that does nothing.
-     *
-     * @param <T>
-     * @param <OP>
-     */
-    public static class NoOpStreamTask<T, OP extends StreamOperator<T>> 
extends StreamTask<T, OP> {
-
-        public NoOpStreamTask(Environment environment) throws Exception {
-            super(environment);
-        }
-
-        @Override
-        protected void init() throws Exception {
-            inputProcessor = new EmptyInputProcessor();
-        }
-
-        @Override
-        protected void cleanUpInternal() throws Exception {}
-    }
-
     /**
      * A stream input processor implementation used to control the returned 
input status based on
      * the total number of processing calls.
@@ -2083,31 +1944,6 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    private static class BlockingFinishStreamOperator extends 
AbstractStreamOperator<Void> {
-        private static final long serialVersionUID = -9042150529568008847L;
-
-        private static volatile OneShotLatch inFinish;
-        private static volatile OneShotLatch finishClose;
-
-        @Override
-        public void finish() throws Exception {
-            checkLatches();
-            inFinish.trigger();
-            finishClose.await();
-            super.close();
-        }
-
-        private void checkLatches() {
-            Preconditions.checkNotNull(inFinish);
-            Preconditions.checkNotNull(finishClose);
-        }
-
-        private static void resetLatches() {
-            inFinish = new OneShotLatch();
-            finishClose = new OneShotLatch();
-        }
-    }
-
     public static Task createTask(
             Class<? extends TaskInvokable> invokable,
             ShuffleEnvironment shuffleEnvironment,
@@ -2241,7 +2077,7 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    private static class EmptyInputProcessor implements StreamInputProcessor {
+    static class EmptyInputProcessor implements StreamInputProcessor {
         private volatile boolean isFinished;
 
         public EmptyInputProcessor() {
@@ -2688,22 +2524,6 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
-    private static class UnusedOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
-        @Override
-        public <T extends StreamOperator<String>> T createStreamOperator(
-                StreamOperatorParameters<String> parameters) {
-            throw new UnsupportedOperationException("This shouldn't be 
called");
-        }
-
-        @Override
-        public void setChainingStrategy(ChainingStrategy strategy) {}
-
-        @Override
-        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
-            throw new UnsupportedOperationException();
-        }
-    }
-
     private static class ClosingOperator<T> extends AbstractStreamOperator<T>
             implements OneInputStreamOperator<T, T> {
         static AtomicBoolean closed = new AtomicBoolean();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 246814baa1e..ef37551a02f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -62,7 +62,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.apache.flink.util.ExceptionUtils;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
index 6af59484f55..4a820687a50 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 
 import org.junit.Before;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
index cabdc0c3bd5..102293c7497 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.ExceptionUtils;
diff --git a/pom.xml b/pom.xml
index 00384ec13be..6e3b0f90b9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1594,7 +1594,6 @@ under the License.
                                                
<test.randomization.seed>${test.randomization.seed}</test.randomization.seed>
                                                
<junit.jupiter.extensions.autodetection.enabled>true</junit.jupiter.extensions.autodetection.enabled>
                                        </systemPropertyVariables>
-                                       <argLine>-Xms256m -Xmx2048m 
-XX:+UseG1GC</argLine>
                                </configuration>
                                <executions>
                                        <!--execute all the unit tests-->
@@ -1608,6 +1607,7 @@ under the License.
                                                        <includes>
                                                                
<include>${test.unit.pattern}</include>
                                                        </includes>
+                                                       <argLine>-Xms256m 
-Xmx1792m -XX:+UseG1GC</argLine>
                                                </configuration>
                                        </execution>
                                        <!--execute all the integration tests-->
@@ -1627,6 +1627,7 @@ under the License.
                                                                     e.g., 
'org.apache.flink.api.scala.typeutils.Foo$Bar$Foobar'. -->
                                                                
<exclude>**/*$*</exclude>
                                                        </excludes>
+                                                       <argLine>-Xms256m 
-Xmx2048m -XX:+UseG1GC</argLine>
                                                        
<reuseForks>false</reuseForks>
                                                </configuration>
                                        </execution>

Reply via email to