KarmaGYZ commented on a change in pull request #18360:
URL: https://github.com/apache/flink/pull/18360#discussion_r800388531
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link ExecutionGraphInfoStore} implementation which stores the {@link
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task
to check and remove the
+ * timeout graphs.
*/
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos =
+ new ConcurrentHashMap<>(4);
+
+ /** Job id with creation timestamp. */
+ private final Queue<JobGraphTimestamp> jobGraphQueue = new
ConcurrentLinkedQueue<>();
+
+ /** Expiration time of execution graphs in memory while 0 means they will
never expire. */
+ private final long expirationMills;
+
+ /** Capacity of the memory store, 0 means unlimited. */
+ private final int maximumCapacity;
+
+ private final ScheduledExecutor scheduledExecutor;
Review comment:
```suggestion
@Nullable private final ScheduledExecutor scheduledExecutor;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link ExecutionGraphInfoStore} implementation which stores the {@link
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task
to check and remove the
+ * timeout graphs.
*/
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos =
+ new ConcurrentHashMap<>(4);
Review comment:
```suggestion
new ConcurrentHashMap<>();
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link ExecutionGraphInfoStore} implementation which stores the {@link
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task
to check and remove the
+ * timeout graphs.
*/
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos =
+ new ConcurrentHashMap<>(4);
+
+ /** Job id with creation timestamp. */
+ private final Queue<JobGraphTimestamp> jobGraphQueue = new
ConcurrentLinkedQueue<>();
Review comment:
I address it myself in
https://github.com/KarmaGYZ/flink/tree/FLINK_25329_support_memory_store_in_session
. You can take a look and fix the test issue.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link ExecutionGraphInfoStore} implementation which stores the {@link
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task
to check and remove the
+ * timeout graphs.
*/
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos =
+ new ConcurrentHashMap<>(4);
+
+ /** Job id with creation timestamp. */
+ private final Queue<JobGraphTimestamp> jobGraphQueue = new
ConcurrentLinkedQueue<>();
+
+ /** Expiration time of execution graphs in memory while 0 means they will
never expire. */
+ private final long expirationMills;
+
+ /** Capacity of the memory store, 0 means unlimited. */
+ private final int maximumCapacity;
+
+ private final ScheduledExecutor scheduledExecutor;
+ private ScheduledFuture<?> cleanupFuture;
Review comment:
```suggestion
@Nullable private ScheduledFuture<?> cleanupFuture;
```
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -314,7 +314,32 @@
key("jobstore.max-capacity")
.defaultValue(Integer.MAX_VALUE)
.withDescription(
- "The max number of completed jobs that can be kept
in the job store.");
+ "The max number of completed jobs that can be kept
in the job store. "
+ + "NOTICE: if memory store keeps too many
jobs in session cluster, it may cause FullGC or OOM in jm.");
+
+ /** Config parameter determining the job store implementation in session
cluster. */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption<JobStoreType> JOB_STORE_TYPE =
+ key("jobstore.type")
+ .enumType(JobStoreType.class)
+ .defaultValue(JobStoreType.File)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Determines which job store
implementation is used in session cluster. Accepted values are:")
+ .list(
+ text(
+ "'File': the file job
store keeps the archived execution graphs in files"),
+ text(
+ "'Memory': the memory job
store keeps the archived execution graphs in memory and "
+ + " it may cause
FullGC or OOM when there are too many graphs"))
Review comment:
```suggestion
"'Memory': the memory
job store keeps the archived execution graphs in memory. You "
+ "may need to
limit the %s to mitigate FullGC or OOM when there are too many graphs",
code(JOB_STORE_MAX_CAPACITY.key())))
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MemoryExecutionGraphInfoStore.java
##########
@@ -20,26 +20,72 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link ExecutionGraphInfoStore} implementation which stores the {@link
ArchivedExecutionGraph} in
- * memory.
+ * memory. The memory store support to keep maximum jobs with given {@link
+ * MemoryExecutionGraphInfoStore#maximumCapacity}, and start a periodic task
to check and remove the
+ * timeout graphs.
*/
public class MemoryExecutionGraphInfoStore implements ExecutionGraphInfoStore {
- private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos = new HashMap<>(4);
+ private final Map<JobID, ExecutionGraphInfo>
serializableExecutionGraphInfos =
+ new ConcurrentHashMap<>(4);
+
+ /** Job id with creation timestamp. */
+ private final Queue<JobGraphTimestamp> jobGraphQueue = new
ConcurrentLinkedQueue<>();
Review comment:
I think we can leverage the guava cache to handle the cleanup,
expiration and concurrent access. Please refer to how we use it in
`FileExecutionGraphInfoStore`. We can maintain a `Cache<JobID,
ExecutionGraphInfo> serializableExecutionGraphInfos`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]