This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c83792b0f3f [FLINK-33317][runtime] Add cleaning mechanism for initial
configs to reduce the memory usage
c83792b0f3f is described below
commit c83792b0f3fb6ae18dfabd52a89dc42c074ee73c
Author: Rui Fan <[email protected]>
AuthorDate: Fri Oct 27 10:53:15 2023 +0800
[FLINK-33317][runtime] Add cleaning mechanism for initial configs to reduce
the memory usage
Address Piotr's comment
Refactor the test to avoid create RegularOperatorChain repeatedly
---
.../flink/streaming/api/graph/StreamConfig.java | 31 +++++++++++
.../flink/streaming/runtime/tasks/StreamTask.java | 1 +
.../flink/streaming/graph/StreamConfigTest.java | 65 ++++++++++++++++++++++
.../tasks/SubtaskCheckpointCoordinatorTest.java | 24 +++++---
.../flink/streaming/util/MockStreamConfig.java | 14 +++++
5 files changed, 126 insertions(+), 9 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 79a5e904ea9..91ed4ce3b47 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -52,6 +52,7 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -151,6 +152,13 @@ public class StreamConfig implements Serializable {
private final transient CompletableFuture<StreamConfig>
serializationFuture =
new CompletableFuture<>();
+ /**
+ * In order to release memory during processing data, some keys are
removed in {@link
+ * #clearInitialConfigs()}. Recording these keys here to prevent they are
accessed after
+ * removing.
+ */
+ private final Set<String> removedKeys = new HashSet<>();
+
public StreamConfig(Configuration config) {
this.config = config;
}
@@ -386,6 +394,9 @@ public class StreamConfig implements Serializable {
public <T extends StreamOperatorFactory<?>> T
getStreamOperatorFactory(ClassLoader cl) {
try {
+ checkState(
+ !removedKeys.contains(SERIALIZED_UDF),
+ String.format("%s has been removed.", SERIALIZED_UDF));
return InstantiationUtil.readObjectFromConfig(this.config,
SERIALIZED_UDF, cl);
} catch (ClassNotFoundException e) {
String classLoaderInfo =
ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
@@ -584,6 +595,9 @@ public class StreamConfig implements Serializable {
public Map<Integer, StreamConfig>
getTransitiveChainedTaskConfigs(ClassLoader cl) {
try {
+ checkState(
+ !removedKeys.contains(CHAINED_TASK_CONFIG),
+ String.format("%s has been removed.",
CHAINED_TASK_CONFIG));
Map<Integer, StreamConfig> confs =
InstantiationUtil.readObjectFromConfig(this.config,
CHAINED_TASK_CONFIG, cl);
return confs == null ? new HashMap<Integer, StreamConfig>() :
confs;
@@ -795,6 +809,23 @@ public class StreamConfig implements Serializable {
return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
}
+ /**
+ * In general, we don't clear any configuration. However, the {@link
#SERIALIZED_UDF} may be
+ * very large when operator includes some large objects, the
SERIALIZED_UDF is used to create a
+ * StreamOperator and usually only needs to be called once. {@link
#CHAINED_TASK_CONFIG} may be
+ * large as well due to the StreamConfig of all non-head operators in
OperatorChain will be
+ * serialized and stored in CHAINED_TASK_CONFIG. They can be cleared to
reduce the memory after
+ * StreamTask is initialized. If so, TM will have more memory during
running. See FLINK-33315
+ * and FLINK-33317 for more information.
+ */
+ public void clearInitialConfigs() {
+ removedKeys.add(SERIALIZED_UDF);
+ config.removeKey(SERIALIZED_UDF);
+
+ removedKeys.add(CHAINED_TASK_CONFIG);
+ config.removeKey(CHAINED_TASK_CONFIG);
+ }
+
/**
* Requirements of the different inputs of an operator. Each input can
have a different
* requirement. For all {@link #SORTED} inputs, records are sorted/grouped
by key and all
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 39b460337dd..eef96812fc4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -743,6 +743,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// task specific initialization
init();
+ configuration.clearInitialConfigs();
// save the work of reloading state, etc, if the task is already
canceled
ensureNotCanceled();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
new file mode 100644
index 00000000000..d3eb45f1990
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.util.MockStreamConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link StreamConfig}. */
+class StreamConfigTest {
+
+ @Test
+ void testClearInitialConfigs() {
+ int chainedTaskId = 3456;
+ MockStreamConfig streamConfig =
+ new MockStreamConfig(
+ new Configuration(),
+ 1,
+ Collections.singletonMap(
+ chainedTaskId, new MockStreamConfig(new
Configuration(), 1)));
+
+ ClassLoader classLoader = getClass().getClassLoader();
+ StreamOperatorFactory<?> streamOperatorFactory =
+ streamConfig.getStreamOperatorFactory(classLoader);
+ assertThat(streamOperatorFactory).isNotNull();
+
assertThat(streamConfig.getStreamOperatorFactoryClass(classLoader)).isNotNull();
+ assertThat(streamConfig.getTransitiveChainedTaskConfigs(classLoader))
+ .hasSize(1)
+ .containsKey(chainedTaskId);
+
+ // StreamOperatorFactory and ChainedTaskConfigs should be cleared
after clearInitialConfigs,
+ // but the factory class shouldn't be cleared.
+ streamConfig.clearInitialConfigs();
+ assertThatThrownBy(() ->
streamConfig.getStreamOperatorFactory(classLoader))
+ .hasCauseInstanceOf(IllegalStateException.class)
+ .hasRootCauseMessage("serializedUDF has been removed.");
+
assertThat(streamConfig.getStreamOperatorFactoryClass(classLoader)).isNotNull();
+ assertThatThrownBy(() ->
streamConfig.getTransitiveChainedTaskConfigs(classLoader))
+ .hasCauseInstanceOf(IllegalStateException.class)
+ .hasRootCauseMessage("chainedTaskConfig_ has been removed.");
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 5bb1110922b..2cba71b4e90 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
@@ -72,7 +73,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -389,10 +389,15 @@ public class SubtaskCheckpointCoordinatorTest {
StreamConfig streamConfig = testHarness.getStreamConfig();
streamConfig.setStreamOperator(new MapOperator());
- testHarness.invoke();
- testHarness.waitForTaskRunning();
-
- MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+ StreamMockEnvironment mockEnvironment =
+ new StreamMockEnvironment(
+ testHarness.jobConfig,
+ testHarness.taskConfig,
+ testHarness.executionConfig,
+ testHarness.memorySize,
+ new MockInputSplitProvider(),
+ testHarness.bufferSize,
+ testHarness.taskStateManager);
try (SubtaskCheckpointCoordinator subtaskCheckpointCoordinator =
new MockSubtaskCheckpointCoordinatorBuilder()
@@ -404,13 +409,14 @@ public class SubtaskCheckpointCoordinatorTest {
ResultPartitionWriter resultPartitionWriter =
new RecordOrEventCollectingResultPartitionWriter<>(
recordOrEvents, stringStreamElementSerializer);
-
mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
+ mockEnvironment.addOutput(resultPartitionWriter);
+
+ testHarness.invoke(mockEnvironment);
+ testHarness.waitForTaskRunning();
OneInputStreamTask<String, String> task = testHarness.getTask();
OperatorChain<String, OneInputStreamOperator<String, String>>
operatorChain =
- new RegularOperatorChain<>(
- task,
-
StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment));
+ task.operatorChain;
long checkpointId = 42L;
// notify checkpoint aborted before execution.
subtaskCheckpointCoordinator.notifyCheckpointAborted(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index 3080e55f512..ec35ce681e2 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -30,13 +30,24 @@ import
org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/** A dummy stream config implementation for specifying the number of outputs
in tests. */
public class MockStreamConfig extends StreamConfig {
public MockStreamConfig(Configuration configuration, int numberOfOutputs) {
+ this(configuration, numberOfOutputs, null);
+ }
+
+ public MockStreamConfig(
+ Configuration configuration,
+ int numberOfOutputs,
+ @Nullable Map<Integer, StreamConfig> chainedTaskConfigs) {
+
super(configuration);
setChainStart();
@@ -71,6 +82,9 @@ public class MockStreamConfig extends StreamConfig {
}
setVertexNonChainedOutputs(streamOutputs);
setOperatorNonChainedOutputs(streamOutputs);
+ if (chainedTaskConfigs != null) {
+ setAndSerializeTransitiveChainedTaskConfigs(chainedTaskConfigs);
+ }
serializeAllConfigs();
}
}