This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c83792b0f3f [FLINK-33317][runtime] Add cleaning mechanism for initial 
configs to reduce the memory usage
c83792b0f3f is described below

commit c83792b0f3fb6ae18dfabd52a89dc42c074ee73c
Author: Rui Fan <[email protected]>
AuthorDate: Fri Oct 27 10:53:15 2023 +0800

    [FLINK-33317][runtime] Add cleaning mechanism for initial configs to reduce 
the memory usage
    
    Address Piotr's comment
    
    Refactor the test to avoid create RegularOperatorChain repeatedly
---
 .../flink/streaming/api/graph/StreamConfig.java    | 31 +++++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  1 +
 .../flink/streaming/graph/StreamConfigTest.java    | 65 ++++++++++++++++++++++
 .../tasks/SubtaskCheckpointCoordinatorTest.java    | 24 +++++---
 .../flink/streaming/util/MockStreamConfig.java     | 14 +++++
 5 files changed, 126 insertions(+), 9 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 79a5e904ea9..91ed4ce3b47 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -52,6 +52,7 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -151,6 +152,13 @@ public class StreamConfig implements Serializable {
     private final transient CompletableFuture<StreamConfig> 
serializationFuture =
             new CompletableFuture<>();
 
+    /**
+     * In order to release memory during processing data, some keys are 
removed in {@link
+     * #clearInitialConfigs()}. Recording these keys here to prevent they are 
accessed after
+     * removing.
+     */
+    private final Set<String> removedKeys = new HashSet<>();
+
     public StreamConfig(Configuration config) {
         this.config = config;
     }
@@ -386,6 +394,9 @@ public class StreamConfig implements Serializable {
 
     public <T extends StreamOperatorFactory<?>> T 
getStreamOperatorFactory(ClassLoader cl) {
         try {
+            checkState(
+                    !removedKeys.contains(SERIALIZED_UDF),
+                    String.format("%s has been removed.", SERIALIZED_UDF));
             return InstantiationUtil.readObjectFromConfig(this.config, 
SERIALIZED_UDF, cl);
         } catch (ClassNotFoundException e) {
             String classLoaderInfo = 
ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
@@ -584,6 +595,9 @@ public class StreamConfig implements Serializable {
 
     public Map<Integer, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
         try {
+            checkState(
+                    !removedKeys.contains(CHAINED_TASK_CONFIG),
+                    String.format("%s has been removed.", 
CHAINED_TASK_CONFIG));
             Map<Integer, StreamConfig> confs =
                     InstantiationUtil.readObjectFromConfig(this.config, 
CHAINED_TASK_CONFIG, cl);
             return confs == null ? new HashMap<Integer, StreamConfig>() : 
confs;
@@ -795,6 +809,23 @@ public class StreamConfig implements Serializable {
         return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
     }
 
+    /**
+     * In general, we don't clear any configuration. However, the {@link 
#SERIALIZED_UDF} may be
+     * very large when operator includes some large objects, the 
SERIALIZED_UDF is used to create a
+     * StreamOperator and usually only needs to be called once. {@link 
#CHAINED_TASK_CONFIG} may be
+     * large as well due to the StreamConfig of all non-head operators in 
OperatorChain will be
+     * serialized and stored in CHAINED_TASK_CONFIG. They can be cleared to 
reduce the memory after
+     * StreamTask is initialized. If so, TM will have more memory during 
running. See FLINK-33315
+     * and FLINK-33317 for more information.
+     */
+    public void clearInitialConfigs() {
+        removedKeys.add(SERIALIZED_UDF);
+        config.removeKey(SERIALIZED_UDF);
+
+        removedKeys.add(CHAINED_TASK_CONFIG);
+        config.removeKey(CHAINED_TASK_CONFIG);
+    }
+
     /**
      * Requirements of the different inputs of an operator. Each input can 
have a different
      * requirement. For all {@link #SORTED} inputs, records are sorted/grouped 
by key and all
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 39b460337dd..eef96812fc4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -743,6 +743,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
         // task specific initialization
         init();
+        configuration.clearInitialConfigs();
 
         // save the work of reloading state, etc, if the task is already 
canceled
         ensureNotCanceled();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
new file mode 100644
index 00000000000..d3eb45f1990
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.util.MockStreamConfig;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link StreamConfig}. */
+class StreamConfigTest {
+
+    @Test
+    void testClearInitialConfigs() {
+        int chainedTaskId = 3456;
+        MockStreamConfig streamConfig =
+                new MockStreamConfig(
+                        new Configuration(),
+                        1,
+                        Collections.singletonMap(
+                                chainedTaskId, new MockStreamConfig(new 
Configuration(), 1)));
+
+        ClassLoader classLoader = getClass().getClassLoader();
+        StreamOperatorFactory<?> streamOperatorFactory =
+                streamConfig.getStreamOperatorFactory(classLoader);
+        assertThat(streamOperatorFactory).isNotNull();
+        
assertThat(streamConfig.getStreamOperatorFactoryClass(classLoader)).isNotNull();
+        assertThat(streamConfig.getTransitiveChainedTaskConfigs(classLoader))
+                .hasSize(1)
+                .containsKey(chainedTaskId);
+
+        // StreamOperatorFactory and ChainedTaskConfigs should be cleared 
after clearInitialConfigs,
+        // but the factory class shouldn't be cleared.
+        streamConfig.clearInitialConfigs();
+        assertThatThrownBy(() -> 
streamConfig.getStreamOperatorFactory(classLoader))
+                .hasCauseInstanceOf(IllegalStateException.class)
+                .hasRootCauseMessage("serializedUDF has been removed.");
+        
assertThat(streamConfig.getStreamOperatorFactoryClass(classLoader)).isNotNull();
+        assertThatThrownBy(() -> 
streamConfig.getTransitiveChainedTaskConfigs(classLoader))
+                .hasCauseInstanceOf(IllegalStateException.class)
+                .hasRootCauseMessage("chainedTaskConfig_ has been removed.");
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 5bb1110922b..2cba71b4e90 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DoneFuture;
@@ -72,7 +73,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -389,10 +389,15 @@ public class SubtaskCheckpointCoordinatorTest {
         StreamConfig streamConfig = testHarness.getStreamConfig();
         streamConfig.setStreamOperator(new MapOperator());
 
-        testHarness.invoke();
-        testHarness.waitForTaskRunning();
-
-        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
+        StreamMockEnvironment mockEnvironment =
+                new StreamMockEnvironment(
+                        testHarness.jobConfig,
+                        testHarness.taskConfig,
+                        testHarness.executionConfig,
+                        testHarness.memorySize,
+                        new MockInputSplitProvider(),
+                        testHarness.bufferSize,
+                        testHarness.taskStateManager);
 
         try (SubtaskCheckpointCoordinator subtaskCheckpointCoordinator =
                 new MockSubtaskCheckpointCoordinatorBuilder()
@@ -404,13 +409,14 @@ public class SubtaskCheckpointCoordinatorTest {
             ResultPartitionWriter resultPartitionWriter =
                     new RecordOrEventCollectingResultPartitionWriter<>(
                             recordOrEvents, stringStreamElementSerializer);
-            
mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
+            mockEnvironment.addOutput(resultPartitionWriter);
+
+            testHarness.invoke(mockEnvironment);
+            testHarness.waitForTaskRunning();
 
             OneInputStreamTask<String, String> task = testHarness.getTask();
             OperatorChain<String, OneInputStreamOperator<String, String>> 
operatorChain =
-                    new RegularOperatorChain<>(
-                            task,
-                            
StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment));
+                    task.operatorChain;
             long checkpointId = 42L;
             // notify checkpoint aborted before execution.
             subtaskCheckpointCoordinator.notifyCheckpointAborted(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index 3080e55f512..ec35ce681e2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -30,13 +30,24 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /** A dummy stream config implementation for specifying the number of outputs 
in tests. */
 public class MockStreamConfig extends StreamConfig {
 
     public MockStreamConfig(Configuration configuration, int numberOfOutputs) {
+        this(configuration, numberOfOutputs, null);
+    }
+
+    public MockStreamConfig(
+            Configuration configuration,
+            int numberOfOutputs,
+            @Nullable Map<Integer, StreamConfig> chainedTaskConfigs) {
+
         super(configuration);
 
         setChainStart();
@@ -71,6 +82,9 @@ public class MockStreamConfig extends StreamConfig {
         }
         setVertexNonChainedOutputs(streamOutputs);
         setOperatorNonChainedOutputs(streamOutputs);
+        if (chainedTaskConfigs != null) {
+            setAndSerializeTransitiveChainedTaskConfigs(chainedTaskConfigs);
+        }
         serializeAllConfigs();
     }
 }

Reply via email to