This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d7812b4af7fa0e1c324104b05629d139dac1912d Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Apr 1 11:38:31 2022 +0200 [FLINK-26995][tests] Migrate tests to ITCases These classes were only renamed: - FileUploadHandlerTest requires exclusive usage of DeleteOnExitHooks - StreamTaskTimerTest requires exclusive access to to StreamTask#TRIGGER_THREAD_GROUP - SinkTransformationTranslatorTest fails it a) relies on initially starting out with a clean state in the streaming graph generation b) relies on test cases permanently mutating the state in the streaming graph generation Some tests of the following classes were moved into a separate ITCase: - MemoryExecutionGraphInfoStoreTest relies on static SignallingBlockingNoOpInvokable - StreamTaskTest requires exclusive usage of the OutputFlusher --- .../MemoryExecutionGraphInfoStoreITCase.java | 67 +++++++ .../MemoryExecutionGraphInfoStoreTest.java | 30 --- ...ndlerTest.java => FileUploadHandlerITCase.java} | 2 +- ...ava => SinkTransformationTranslatorITCase.java} | 10 +- ...skTimerTest.java => StreamTaskTimerITCase.java} | 2 +- .../operators/TestProcessingTimeServiceTest.java | 2 +- .../runtime/tasks/LocalStateForwardingTest.java | 2 +- .../streaming/runtime/tasks/StreamTaskITCase.java | 213 +++++++++++++++++++++ .../streaming/runtime/tasks/StreamTaskTest.java | 182 +----------------- .../tasks/SubtaskCheckpointCoordinatorTest.java | 2 +- .../runtime/tasks/SynchronousCheckpointTest.java | 2 +- .../JobMasterStopWithSavepointITCase.java | 2 +- pom.xml | 3 +- 13 files changed, 297 insertions(+), 222 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java new file mode 100644 index 00000000000..affa04550c5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreITCase.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; + +/** Tests for the {@link MemoryExecutionGraphInfoStore}. */ +public class MemoryExecutionGraphInfoStoreITCase extends TestLogger { + + @ClassRule + public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorResource(); + + /** Tests that a session cluster can terminate gracefully when jobs are still running. */ + @Test + public void testPutSuspendedJobOnClusterShutdown() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.JOB_STORE_TYPE, JobManagerOptions.JobStoreType.Memory); + try (final MiniCluster miniCluster = + new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster( + new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .build(), + new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { + miniCluster.start(); + final JobVertex vertex = new JobVertex("blockingVertex"); + // The adaptive scheduler expects that every vertex has a configured parallelism + vertex.setParallelism(1); + vertex.setInvokableClass( + ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); + miniCluster.submitJob(jobGraph); + ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java index c1cb28833c0..350f39dc601 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStoreTest.java @@ -21,16 +21,9 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.JobsOverview; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.util.ManualTicker; @@ -240,29 +233,6 @@ public class MemoryExecutionGraphInfoStoreTest extends TestLogger { } } - /** Tests that a session cluster can terminate gracefully when jobs are still running. */ - @Test - public void testPutSuspendedJobOnClusterShutdown() throws Exception { - Configuration configuration = new Configuration(); - configuration.set(JobManagerOptions.JOB_STORE_TYPE, JobManagerOptions.JobStoreType.Memory); - try (final MiniCluster miniCluster = - new ExecutionGraphInfoStoreTestUtils.PersistingMiniCluster( - new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .build(), - new ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()))) { - miniCluster.start(); - final JobVertex vertex = new JobVertex("blockingVertex"); - // The adaptive scheduler expects that every vertex has a configured parallelism - vertex.setParallelism(1); - vertex.setInvokableClass( - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.class); - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex); - miniCluster.submitJob(jobGraph); - ExecutionGraphInfoStoreTestUtils.SignallingBlockingNoOpInvokable.LATCH.await(); - } - } - private void assertPutJobGraphWithStatus(JobStatus jobStatus) throws IOException { final ExecutionGraphInfo dummyExecutionGraphInfo = new ExecutionGraphInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java similarity index 99% rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java index 76170f652ad..632f57ce878 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java @@ -65,7 +65,7 @@ import static org.junit.Assert.fail; * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files * and/or json are properly handled. */ -public class FileUploadHandlerTest extends TestLogger { +public class FileUploadHandlerITCase extends TestLogger { @Rule public final MultipartUploadResource multipartUpdateResource = new MultipartUploadResource(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java similarity index 98% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java index e2086ec63ac..4d0b2323c78 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCase.java @@ -50,9 +50,13 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -/** Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. */ +/** + * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. + * + * <p>ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. + */ @RunWith(Parameterized.class) -public class SinkTransformationTranslatorTest extends TestLogger { +public class SinkTransformationTranslatorITCase extends TestLogger { @Parameterized.Parameters(name = "Execution Mode: {0}") public static Collection<Object> data() { @@ -362,7 +366,7 @@ public class SinkTransformationTranslatorTest extends TestLogger { private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) { dataStreamSink.name(NAME); dataStreamSink.uid(UID); - dataStreamSink.setParallelism(SinkTransformationTranslatorTest.PARALLELISM); + dataStreamSink.setParallelism(SinkTransformationTranslatorITCase.PARALLELISM); dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java similarity index 99% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java index e03ee80776c..81c5b60f4a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerITCase.java @@ -53,7 +53,7 @@ import static org.junit.Assert.assertThat; /** Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ @SuppressWarnings("serial") -public class StreamTaskTimerTest extends TestLogger { +public class StreamTaskTimerITCase extends TestLogger { private StreamTaskTestHarness<?> testHarness; private ProcessingTimeService timeService; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index dd8a746e139..d9d0a5c8ec7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -49,7 +49,7 @@ public class TestProcessingTimeServiceTest { StreamConfig streamConfig = testHarness.getStreamConfig(); StreamMap<String, String> mapOperator = - new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<>()); + new StreamMap<>(new StreamTaskTimerITCase.DummyMapFunction<>()); streamConfig.setStreamOperator(mapOperator); streamConfig.setOperatorID(new OperatorID()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index dbead9e7f89..2b36239b166 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -98,7 +98,7 @@ public class LocalStateForwardingTest extends TestLogger { 0, taskStateManager); - StreamTask testStreamTask = new StreamTaskTest.NoOpStreamTask(streamMockEnvironment); + StreamTask testStreamTask = new StreamTaskITCase.NoOpStreamTask(streamMockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(0L, 0L); CheckpointMetricsBuilder checkpointMetrics = new CheckpointMetricsBuilder(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java new file mode 100644 index 00000000000..3423fa341a8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskITCase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; +import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TestTaskBuilder; +import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.hamcrest.CoreMatchers; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.ScheduledExecutorService; + +import static java.util.Collections.singletonList; +import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_PERIOD; +import static org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link StreamTask}. */ +public class StreamTaskITCase extends TestLogger { + @ClassRule + public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorResource(); + + @Test + public void testRecordWriterClosedOnTransitDeployingStateError() throws Exception { + testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING); + } + + @Test + public void testRecordWriterClosedOnTransitInitializingStateError() throws Exception { + testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING); + } + + @Test + public void testRecordWriterClosedOnTransitRunningStateError() throws Exception { + testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING); + } + + private void testRecordWriterClosedOnTransitStateError(ExecutionState executionState) + throws Exception { + // Throw the exception when the state updating to the expected one. + NoOpTaskManagerActions taskManagerActions = + new NoOpTaskManagerActions() { + @Override + public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { + if (taskExecutionState.getExecutionState() == executionState) { + throw new ExpectedTestException(); + } + } + }; + + testRecordWriterClosedOnError( + env -> + taskBuilderWithConfiguredRecordWriter(env) + .setTaskManagerActions(taskManagerActions) + .build(EXECUTOR_RESOURCE.getExecutor())); + } + + @Test + public void testFailInEndOfConstructor() throws Exception { + Configuration conf = new Configuration(); + // Set the wrong setting type for forcing the fail during read. + conf.setString(BUFFER_DEBLOAT_PERIOD.key(), "a"); + testRecordWriterClosedOnError( + env -> + taskBuilderWithConfiguredRecordWriter(env) + .setTaskManagerConfig(conf) + .build(EXECUTOR_RESOURCE.getExecutor())); + } + + private void testRecordWriterClosedOnError( + FunctionWithException<NettyShuffleEnvironment, Task, Exception> taskProvider) + throws Exception { + try (NettyShuffleEnvironment shuffleEnvironment = + new NettyShuffleEnvironmentBuilder().build()) { + Task task = taskProvider.apply(shuffleEnvironment); + + task.startTaskThread(); + task.getExecutingThread().join(); + + assertEquals(ExecutionState.FAILED, task.getExecutionState()); + for (Thread thread : Thread.getAllStackTraces().keySet()) { + assertThat( + thread.getName(), + CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME)))); + } + } + } + + private TestTaskBuilder taskBuilderWithConfiguredRecordWriter( + NettyShuffleEnvironment shuffleEnvironment) { + Configuration taskConfiguration = new Configuration(); + outputEdgeConfiguration(taskConfiguration); + + ResultPartitionDeploymentDescriptor descriptor = + new ResultPartitionDeploymentDescriptor( + PartitionDescriptorBuilder.newBuilder().build(), + NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), + 1, + false); + return new TestTaskBuilder(shuffleEnvironment) + .setInvokable(NoOpStreamTask.class) + .setTaskConfig(taskConfiguration) + .setResultPartitions(singletonList(descriptor)); + } + + /** + * Make sure that there is some output edge in the config so that some RecordWriter is created. + */ + private void outputEdgeConfiguration(Configuration taskConfiguration) { + StreamConfig streamConfig = new StreamConfig(taskConfiguration); + streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory()); + + StreamConfigChainer cfg = + new StreamConfigChainer(new OperatorID(42, 42), streamConfig, this, 1); + // The OutputFlusher thread is started only if the buffer timeout more than 0(default value + // is 0). + cfg.setBufferTimeout(1); + cfg.chain( + new OperatorID(44, 44), + new UnusedOperatorFactory(), + StringSerializer.INSTANCE, + StringSerializer.INSTANCE, + false); + cfg.finish(); + } + + // ------------------------------------------------------------------------ + // Test Utilities + // ------------------------------------------------------------------------ + + /** + * Operator that does nothing. + * + * @param <T> + * @param <OP> + */ + public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> { + + public NoOpStreamTask(Environment environment) throws Exception { + super(environment); + } + + @Override + protected void init() throws Exception { + inputProcessor = new StreamTaskTest.EmptyInputProcessor(); + } + + @Override + protected void cleanUpInternal() throws Exception {} + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + + private static class UnusedOperatorFactory extends AbstractStreamOperatorFactory<String> { + @Override + public <T extends StreamOperator<String>> T createStreamOperator( + StreamOperatorParameters<String> parameters) { + throw new UnsupportedOperationException("This shouldn't be called"); + } + + @Override + public void setChainingStrategy(ChainingStrategy strategy) {} + + @Override + public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 6e51e7a61d7..6a2d576c8a6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTim import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.SavepointFormatType; @@ -45,7 +44,6 @@ import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.ExecutionState; @@ -66,7 +64,6 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; -import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -102,7 +99,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TestTaskBuilder; import org.apache.flink.runtime.throughput.ThroughputCalculator; -import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -110,15 +106,12 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamOperatorStateContext; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; @@ -134,17 +127,14 @@ import org.apache.flink.util.CloseableIterable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FatalExitExceptionHandler; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.TestingUncaughtExceptionHandler; import org.apache.flink.util.function.BiConsumerWithException; -import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.SupplierWithException; -import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.ClassRule; @@ -183,7 +173,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_ENABLED; @@ -193,15 +182,12 @@ import static org.apache.flink.configuration.TaskManagerOptions.BUFFER_DEBLOAT_T import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_SEGMENT_SIZE; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE; import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton; -import static org.apache.flink.runtime.io.network.api.writer.RecordWriter.DEFAULT_OUTPUT_FLUSH_THREAD_NAME; import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY; import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning; import static org.apache.flink.util.Preconditions.checkState; import static org.hamcrest.CoreMatchers.both; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -1183,110 +1169,6 @@ public class StreamTaskTest extends TestLogger { } } - @Test - public void testRecordWriterClosedOnTransitDeployingStateError() throws Exception { - testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING); - } - - @Test - public void testRecordWriterClosedOnTransitInitializingStateError() throws Exception { - testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING); - } - - @Test - public void testRecordWriterClosedOnTransitRunningStateError() throws Exception { - testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING); - } - - private void testRecordWriterClosedOnTransitStateError(ExecutionState executionState) - throws Exception { - // Throw the exception when the state updating to the expected one. - NoOpTaskManagerActions taskManagerActions = - new NoOpTaskManagerActions() { - @Override - public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { - if (taskExecutionState.getExecutionState() == executionState) { - throw new ExpectedTestException(); - } - } - }; - - testRecordWriterClosedOnError( - env -> - taskBuilderWithConfiguredRecordWriter(env) - .setTaskManagerActions(taskManagerActions) - .build(EXECUTOR_RESOURCE.getExecutor())); - } - - @Test - public void testFailInEndOfConstructor() throws Exception { - Configuration conf = new Configuration(); - // Set the wrong setting type for forcing the fail during read. - conf.setString(BUFFER_DEBLOAT_PERIOD.key(), "a"); - testRecordWriterClosedOnError( - env -> - taskBuilderWithConfiguredRecordWriter(env) - .setTaskManagerConfig(conf) - .build(EXECUTOR_RESOURCE.getExecutor())); - } - - private void testRecordWriterClosedOnError( - FunctionWithException<NettyShuffleEnvironment, Task, Exception> taskProvider) - throws Exception { - try (NettyShuffleEnvironment shuffleEnvironment = - new NettyShuffleEnvironmentBuilder().build()) { - Task task = taskProvider.apply(shuffleEnvironment); - - task.startTaskThread(); - task.getExecutingThread().join(); - - assertEquals(ExecutionState.FAILED, task.getExecutionState()); - for (Thread thread : Thread.getAllStackTraces().keySet()) { - assertThat( - thread.getName(), - CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME)))); - } - } - } - - private TestTaskBuilder taskBuilderWithConfiguredRecordWriter( - NettyShuffleEnvironment shuffleEnvironment) { - Configuration taskConfiguration = new Configuration(); - outputEdgeConfiguration(taskConfiguration); - - ResultPartitionDeploymentDescriptor descriptor = - new ResultPartitionDeploymentDescriptor( - PartitionDescriptorBuilder.newBuilder().build(), - NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), - 1, - false); - return new TestTaskBuilder(shuffleEnvironment) - .setInvokable(NoOpStreamTask.class) - .setTaskConfig(taskConfiguration) - .setResultPartitions(singletonList(descriptor)); - } - - /** - * Make sure that there is some output edge in the config so that some RecordWriter is created. - */ - private void outputEdgeConfiguration(Configuration taskConfiguration) { - StreamConfig streamConfig = new StreamConfig(taskConfiguration); - streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory()); - - StreamConfigChainer cfg = - new StreamConfigChainer(new OperatorID(42, 42), streamConfig, this, 1); - // The OutputFlusher thread is started only if the buffer timeout more than 0(default value - // is 0). - cfg.setBufferTimeout(1); - cfg.chain( - new OperatorID(44, 44), - new UnusedOperatorFactory(), - StringSerializer.INSTANCE, - StringSerializer.INSTANCE, - false); - cfg.finish(); - } - @Test public void testProcessWithAvailableOutput() throws Exception { try (final MockEnvironment environment = setupEnvironment(true, true)) { @@ -1997,27 +1879,6 @@ public class StreamTaskTest extends TestLogger { return new RunningTask<>(taskCreationFuture.get(), invocationFuture); } - /** - * Operator that does nothing. - * - * @param <T> - * @param <OP> - */ - public static class NoOpStreamTask<T, OP extends StreamOperator<T>> extends StreamTask<T, OP> { - - public NoOpStreamTask(Environment environment) throws Exception { - super(environment); - } - - @Override - protected void init() throws Exception { - inputProcessor = new EmptyInputProcessor(); - } - - @Override - protected void cleanUpInternal() throws Exception {} - } - /** * A stream input processor implementation used to control the returned input status based on * the total number of processing calls. @@ -2083,31 +1944,6 @@ public class StreamTaskTest extends TestLogger { } } - private static class BlockingFinishStreamOperator extends AbstractStreamOperator<Void> { - private static final long serialVersionUID = -9042150529568008847L; - - private static volatile OneShotLatch inFinish; - private static volatile OneShotLatch finishClose; - - @Override - public void finish() throws Exception { - checkLatches(); - inFinish.trigger(); - finishClose.await(); - super.close(); - } - - private void checkLatches() { - Preconditions.checkNotNull(inFinish); - Preconditions.checkNotNull(finishClose); - } - - private static void resetLatches() { - inFinish = new OneShotLatch(); - finishClose = new OneShotLatch(); - } - } - public static Task createTask( Class<? extends TaskInvokable> invokable, ShuffleEnvironment shuffleEnvironment, @@ -2241,7 +2077,7 @@ public class StreamTaskTest extends TestLogger { } } - private static class EmptyInputProcessor implements StreamInputProcessor { + static class EmptyInputProcessor implements StreamInputProcessor { private volatile boolean isFinished; public EmptyInputProcessor() { @@ -2688,22 +2524,6 @@ public class StreamTaskTest extends TestLogger { } } - private static class UnusedOperatorFactory extends AbstractStreamOperatorFactory<String> { - @Override - public <T extends StreamOperator<String>> T createStreamOperator( - StreamOperatorParameters<String> parameters) { - throw new UnsupportedOperationException("This shouldn't be called"); - } - - @Override - public void setChainingStrategy(ChainingStrategy strategy) {} - - @Override - public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) { - throw new UnsupportedOperationException(); - } - } - private static class ClosingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> { static AtomicBoolean closed = new AtomicBoolean(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index 246814baa1e..ef37551a02f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -62,7 +62,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.apache.flink.util.ExceptionUtils; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java index 6af59484f55..4a820687a50 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; -import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; import org.junit.Before; diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java index cabdc0c3bd5..102293c7497 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskITCase.NoOpStreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.ExceptionUtils; diff --git a/pom.xml b/pom.xml index 00384ec13be..6e3b0f90b9e 100644 --- a/pom.xml +++ b/pom.xml @@ -1594,7 +1594,6 @@ under the License. <test.randomization.seed>${test.randomization.seed}</test.randomization.seed> <junit.jupiter.extensions.autodetection.enabled>true</junit.jupiter.extensions.autodetection.enabled> </systemPropertyVariables> - <argLine>-Xms256m -Xmx2048m -XX:+UseG1GC</argLine> </configuration> <executions> <!--execute all the unit tests--> @@ -1608,6 +1607,7 @@ under the License. <includes> <include>${test.unit.pattern}</include> </includes> + <argLine>-Xms256m -Xmx1792m -XX:+UseG1GC</argLine> </configuration> </execution> <!--execute all the integration tests--> @@ -1627,6 +1627,7 @@ under the License. e.g., 'org.apache.flink.api.scala.typeutils.Foo$Bar$Foobar'. --> <exclude>**/*$*</exclude> </excludes> + <argLine>-Xms256m -Xmx2048m -XX:+UseG1GC</argLine> <reuseForks>false</reuseForks> </configuration> </execution>
