KarmaGYZ commented on a change in pull request #18360:
URL: https://github.com/apache/flink/pull/18360#discussion_r800388531



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * {@link ExecutionGraphInfoStore} implementation which stores the {@link 
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task 
to check and remove the
+ * timeout graphs.
  */
 public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos =
+            new ConcurrentHashMap<>(4);
+
+    /** Job id with creation timestamp. */
+    private final Queue<JobGraphTimestamp> jobGraphQueue = new 
ConcurrentLinkedQueue<>();
+
+    /** Expiration time of execution graphs in memory while 0 means they will 
never expire. */
+    private final long expirationMills;
+
+    /** Capacity of the memory store, 0 means unlimited. */
+    private final int maximumCapacity;
+
+    private final ScheduledExecutor scheduledExecutor;

Review comment:
       ```suggestion
       @Nullable private final ScheduledExecutor scheduledExecutor;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * {@link ExecutionGraphInfoStore} implementation which stores the {@link 
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task 
to check and remove the
+ * timeout graphs.
  */
 public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos =
+            new ConcurrentHashMap<>(4);

Review comment:
       ```suggestion
               new ConcurrentHashMap<>();
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * {@link ExecutionGraphInfoStore} implementation which stores the {@link 
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task 
to check and remove the
+ * timeout graphs.
  */
 public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos =
+            new ConcurrentHashMap<>(4);
+
+    /** Job id with creation timestamp. */
+    private final Queue<JobGraphTimestamp> jobGraphQueue = new 
ConcurrentLinkedQueue<>();

Review comment:
       I address it myself in 
https://github.com/KarmaGYZ/flink/tree/FLINK_25329_support_memory_store_in_session
 . You can take a look and fix the test issue.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * {@link ExecutionGraphInfoStore} implementation which stores the {@link 
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task 
to check and remove the
+ * timeout graphs.
  */
 public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos =
+            new ConcurrentHashMap<>(4);
+
+    /** Job id with creation timestamp. */
+    private final Queue<JobGraphTimestamp> jobGraphQueue = new 
ConcurrentLinkedQueue<>();
+
+    /** Expiration time of execution graphs in memory while 0 means they will 
never expire. */
+    private final long expirationMills;
+
+    /** Capacity of the memory store, 0 means unlimited. */
+    private final int maximumCapacity;
+
+    private final ScheduledExecutor scheduledExecutor;
+    private ScheduledFuture<?> cleanupFuture;

Review comment:
       ```suggestion
       @Nullable private ScheduledFuture<?> cleanupFuture;
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -314,7 +314,32 @@
             key("jobstore.max-capacity")
                     .defaultValue(Integer.MAX_VALUE)
                     .withDescription(
-                            "The max number of completed jobs that can be kept 
in the job store.");
+                            "The max number of completed jobs that can be kept 
in the job store. "
+                                    + "NOTICE: if memory store keeps too many 
jobs in session cluster, it may cause FullGC or OOM in jm.");
+
+    /** Config parameter determining the job store implementation in session 
cluster. */
+    @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+    public static final ConfigOption<JobStoreType> JOB_STORE_TYPE =
+            key("jobstore.type")
+                    .enumType(JobStoreType.class)
+                    .defaultValue(JobStoreType.File)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Determines which job store 
implementation is used in session cluster. Accepted values are:")
+                                    .list(
+                                            text(
+                                                    "'File': the file job 
store keeps the archived execution graphs in files"),
+                                            text(
+                                                    "'Memory': the memory job 
store keeps the archived execution graphs in memory and "
+                                                            + " it may cause 
FullGC or OOM when there are too many graphs"))

Review comment:
       ```suggestion
                                                       "'Memory': the memory 
job store keeps the archived execution graphs in memory. You "
                                                               + "may need to 
limit the %s to mitigate FullGC or OOM when there are too many graphs",
                                                               
code(JOB_STORE_MAX_CAPACITY.key())))
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * {@link ExecutionGraphInfoStore} implementation which stores the {@link 
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task 
to check and remove the
+ * timeout graphs.
  */
 public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
 
-    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos = new HashMap<>(4);
+    private final Map<JobID, ExecutionGraphInfo> 
serializableExecutionGraphInfos =
+            new ConcurrentHashMap<>(4);
+
+    /** Job id with creation timestamp. */
+    private final Queue<JobGraphTimestamp> jobGraphQueue = new 
ConcurrentLinkedQueue<>();

Review comment:
       I think we can leverage the guava cache to handle the cleanup, 
expiration and concurrent access. Please refer to how we use it in 
`FileExecutionGraphInfoStore`. We can maintain a `Cache<JobID, 
ExecutionGraphInfo> serializableExecutionGraphInfos`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to