[ 
https://issues.apache.org/jira/browse/FLINK-10066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578712#comment-16578712
 ] 

ASF GitHub Bot commented on FLINK-10066:
----------------------------------------

asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of 
previous executions
URL: https://github.com/apache/flink/pull/6500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 4b1c62fe707..ab8c94c0188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -40,6 +41,8 @@
 
        private final TaskManagerLocation assignedResourceLocation; // for the 
archived execution
 
+       private final AllocationID assignedAllocationID;
+
        /* Continuously updated map of user-defined accumulators */
        private final StringifiedAccumulatorResult[] userAccumulators;
 
@@ -48,21 +51,24 @@
        private final IOMetrics ioMetrics;
 
        public ArchivedExecution(Execution execution) {
-               this.userAccumulators = 
execution.getUserAccumulatorsStringified();
-               this.attemptId = execution.getAttemptId();
-               this.attemptNumber = execution.getAttemptNumber();
-               this.stateTimestamps = execution.getStateTimestamps();
-               this.parallelSubtaskIndex = 
execution.getVertex().getParallelSubtaskIndex();
-               this.state = execution.getState();
-               this.failureCause = 
ExceptionUtils.stringifyException(execution.getFailureCause());
-               this.assignedResourceLocation = 
execution.getAssignedResourceLocation();
-               this.ioMetrics = execution.getIOMetrics();
+               this(
+                       execution.getUserAccumulatorsStringified(),
+                       execution.getIOMetrics(),
+                       execution.getAttemptId(),
+                       execution.getAttemptNumber(),
+                       execution.getState(),
+                       
ExceptionUtils.stringifyException(execution.getFailureCause()),
+                       execution.getAssignedResourceLocation(),
+                       execution.getAssignedAllocationID(),
+                       execution.getVertex().getParallelSubtaskIndex(),
+                       execution.getStateTimestamps());
        }
 
        public ArchivedExecution(
                        StringifiedAccumulatorResult[] userAccumulators, 
IOMetrics ioMetrics,
                        ExecutionAttemptID attemptId, int attemptNumber, 
ExecutionState state, String failureCause,
-                       TaskManagerLocation assignedResourceLocation, int 
parallelSubtaskIndex, long[] stateTimestamps) {
+                       TaskManagerLocation assignedResourceLocation, 
AllocationID assignedAllocationID,  int parallelSubtaskIndex,
+                       long[] stateTimestamps) {
                this.userAccumulators = userAccumulators;
                this.ioMetrics = ioMetrics;
                this.failureCause = failureCause;
@@ -72,6 +78,7 @@ public ArchivedExecution(
                this.state = state;
                this.stateTimestamps = stateTimestamps;
                this.parallelSubtaskIndex = parallelSubtaskIndex;
+               this.assignedAllocationID = assignedAllocationID;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -103,6 +110,10 @@ public TaskManagerLocation getAssignedResourceLocation() {
                return assignedResourceLocation;
        }
 
+       public AllocationID getAssignedAllocationID() {
+               return assignedAllocationID;
+       }
+
        @Override
        public String getFailureCauseAsString() {
                return failureCause;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 36669d34be7..04efa048fb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -40,7 +40,7 @@
 
        public ArchivedExecutionVertex(ExecutionVertex vertex) {
                this.subTaskIndex = vertex.getParallelSubtaskIndex();
-               this.priorExecutions = 
vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
+               this.priorExecutions = vertex.getCopyOfPriorExecutionsList();
                this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
                this.currentExecution = 
vertex.getCurrentExecutionAttempt().archive();
        }
@@ -101,17 +101,4 @@ public ArchivedExecution getPriorExecutionAttempt(int 
attemptNumber) {
                        throw new IllegalArgumentException("attempt does not 
exist");
                }
        }
-
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private static final EvictingBoundedList.Function<Execution, 
ArchivedExecution> ARCHIVER =
-                       new EvictingBoundedList.Function<Execution, 
ArchivedExecution>() {
-
-               @Override
-               public ArchivedExecution apply(Execution value) {
-                       return value.archive();
-               }
-       };
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e385318b810..e4228011830 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -92,7 +92,7 @@
 
        private final int subTaskIndex;
 
-       private final EvictingBoundedList<Execution> priorExecutions;
+       private final EvictingBoundedList<ArchivedExecution> priorExecutions;
 
        private final Time timeout;
 
@@ -287,7 +287,7 @@ public TaskManagerLocation 
getCurrentAssignedResourceLocation() {
        }
 
        @Override
-       public Execution getPriorExecutionAttempt(int attemptNumber) {
+       public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
                synchronized (priorExecutions) {
                        if (attemptNumber >= 0 && attemptNumber < 
priorExecutions.size()) {
                                return priorExecutions.get(attemptNumber);
@@ -297,7 +297,7 @@ public Execution getPriorExecutionAttempt(int 
attemptNumber) {
                }
        }
 
-       public Execution getLatestPriorExecution() {
+       public ArchivedExecution getLatestPriorExecution() {
                synchronized (priorExecutions) {
                        final int size = priorExecutions.size();
                        if (size > 0) {
@@ -316,16 +316,16 @@ public Execution getLatestPriorExecution() {
         * @return The latest prior execution location, or null, if there is 
none, yet.
         */
        public TaskManagerLocation getLatestPriorLocation() {
-               Execution latestPriorExecution = getLatestPriorExecution();
+               ArchivedExecution latestPriorExecution = 
getLatestPriorExecution();
                return latestPriorExecution != null ? 
latestPriorExecution.getAssignedResourceLocation() : null;
        }
 
        public AllocationID getLatestPriorAllocation() {
-               Execution latestPriorExecution = getLatestPriorExecution();
+               ArchivedExecution latestPriorExecution = 
getLatestPriorExecution();
                return latestPriorExecution != null ? 
latestPriorExecution.getAssignedAllocationID() : null;
        }
 
-       EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+       EvictingBoundedList<ArchivedExecution> getCopyOfPriorExecutionsList() {
                synchronized (priorExecutions) {
                        return new EvictingBoundedList<>(priorExecutions);
                }
@@ -577,7 +577,7 @@ public Execution resetForNewExecution(final long timestamp, 
final long originati
                        final ExecutionState oldState = oldExecution.getState();
 
                        if (oldState.isTerminal()) {
-                               priorExecutions.add(oldExecution);
+                               priorExecutions.add(oldExecution.archive());
 
                                final Execution newExecution = new Execution(
                                        getExecutionGraph().getFutureExecutor(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index 2c5b6a9fbcb..9a529f3d40b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -20,6 +20,8 @@
 
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.io.Serializable;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
@@ -146,6 +148,7 @@ private T accessInternal(int arrayIndex) {
                return (T) elements[arrayIndex];
        }
 
+       @Nonnull
        @Override
        public Iterator<T> iterator() {
                return new Iterator<T>() {
@@ -176,50 +179,4 @@ public void remove() {
                        }
                };
        }
-
-       /**
-        * Creates a new list that replaces its elements with transformed 
elements.
-        * The list retains the same size and position-to-element mapping.
-        * 
-        * <p>Note that null values are automatically mapped to null values.
-        * 
-        * @param transform The function used to transform each element
-        * @param <R> The type of the elements in the result list.
-        * 
-        * @return The list with the mapped elements
-        */
-       public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
-               // map the default element
-               final R newDefault = defaultElement == null ? null : 
transform.apply(defaultElement);
-
-               // copy the list with the new default
-               final EvictingBoundedList<R> result = new 
EvictingBoundedList<>(elements.length, newDefault);
-               result.count = count;
-               result.idx = idx;
-
-               // map all the entries in the list
-               final int numElements = Math.min(elements.length, count);
-               for (int i = 0; i < numElements; i++) {
-                       result.elements[i] = transform.apply(accessInternal(i));
-               }
-
-               return result;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * A simple unary function that can be used to transform elements via 
the
-        * {@link EvictingBoundedList#map(Function)} method.
-        */
-       public interface Function<I, O> {
-
-               /**
-                * Transforms the value.
-                * 
-                * @param value The value to transform
-                * @return The transformed value
-                */
-               O apply(I value);
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index af8b995a3e6..99dcbf76305 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
@@ -93,6 +94,7 @@ public void testHandleRequest() throws Exception {
                timestamps[expectedState.ordinal()] = finishedTs;
 
                final LocalTaskManagerLocation assignedResourceLocation = new 
LocalTaskManagerLocation();
+               final AllocationID allocationID = new AllocationID();
 
                final int subtaskIndex = 1;
                final int attempt = 2;
@@ -104,6 +106,7 @@ public void testHandleRequest() throws Exception {
                        expectedState,
                        null,
                        assignedResourceLocation,
+                       allocationID,
                        subtaskIndex,
                        timestamps);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index df4ff04bf74..1099e4e71f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public void testHandleRequest() throws Exception {
                        ExecutionState.FINISHED,
                        null,
                        null,
+                       null,
                        subtaskIndex,
                        new long[ExecutionState.values().length]);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 8e44c0e9731..22a7d774e8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -104,6 +104,7 @@ public void testHandleRequest() throws Exception {
                                                expectedState,
                                                null,
                                                null,
+                                               null,
                                                subtaskIndex,
                                                new 
long[ExecutionState.values().length]),
                                        new EvictingBoundedList<>(0)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
index ad5cd6be739..38c1d7ed982 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
@@ -21,6 +21,7 @@
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
@@ -42,6 +43,7 @@
        private ExecutionState state;
        private String failureCause;
        private TaskManagerLocation assignedResourceLocation;
+       private AllocationID assignedAllocationID;
        private StringifiedAccumulatorResult[] userAccumulators;
        private IOMetrics ioMetrics;
        private int parallelSubtaskIndex;
@@ -77,6 +79,11 @@ public ArchivedExecutionBuilder 
setAssignedResourceLocation(TaskManagerLocation
                return this;
        }
 
+       public ArchivedExecutionBuilder setAssignedAllocationID(AllocationID 
assignedAllocationID) {
+               this.assignedAllocationID = assignedAllocationID;
+               return this;
+       }
+
        public ArchivedExecutionBuilder 
setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
                this.userAccumulators = userAccumulators;
                return this;
@@ -101,6 +108,7 @@ public ArchivedExecution build() throws 
UnknownHostException {
                        state != null ? state : ExecutionState.FINISHED,
                        failureCause != null ? failureCause : "(null)",
                        assignedResourceLocation != null ? 
assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), 
InetAddress.getLocalHost(), 1234),
+                       assignedAllocationID != null ? assignedAllocationID : 
new AllocationID(0L, 0L),
                        parallelSubtaskIndex,
                        stateTimestamps != null ? stateTimestamps : new 
long[]{1, 2, 3, 4, 5, 5, 5, 5}
                );
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
index 92b0d8acee3..caf0fed2ae7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
@@ -108,11 +109,13 @@ private static void generateArchivedJob() throws 
Exception {
                StringifiedAccumulatorResult acc1 = new 
StringifiedAccumulatorResult("name1", "type1", "value1");
                StringifiedAccumulatorResult acc2 = new 
StringifiedAccumulatorResult("name2", "type2", "value2");
                TaskManagerLocation location = new TaskManagerLocation(new 
ResourceID("hello"), InetAddress.getLocalHost(), 1234);
+               AllocationID allocationID = new AllocationID(42L, 43L);
                originalAttempt = new ArchivedExecutionBuilder()
                        .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9})
                        .setParallelSubtaskIndex(1)
                        .setAttemptNumber(0)
                        .setAssignedResourceLocation(location)
+                       .setAssignedAllocationID(allocationID)
                        .setUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
                        .setState(ExecutionState.FINISHED)
                        .setFailureCause("attemptException")
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index 7109dacb20d..40bd0216153 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,12 +28,12 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
+/**
+ * Tests for {@link EvictingBoundedList}.
+ */
 public class EvictingBoundedListTest {
 
        @Test
@@ -164,96 +164,4 @@ public void testIterator() {
 
                }
        }
-
-       @Test
-       public void testMapWithHalfFullList() {
-               final Object[] originals = { new Object(), new Object(), new 
Object() };
-               final Object defaultValue = new Object();
-
-               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(5, defaultValue);
-               for (Object o : originals) {
-                       original.add(o);
-               }
-
-               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
-
-               assertEquals(original.size(), transformed.size());
-               assertEquals(original.getSizeLimit(), 
transformed.getSizeLimit());
-               assertEquals(defaultValue, 
transformed.getDefaultElement().original);
-
-               int i = 0;
-               for (TransformedObject to : transformed) {
-                       assertEquals(originals[i++], to.original); 
-               }
-
-               try {
-                       transformed.get(originals.length);
-                       fail("should have failed with an exception");
-               } catch (IndexOutOfBoundsException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testMapWithEvictedElements() {
-               final Object[] originals = { new Object(), new Object(), new 
Object(), new Object(), new Object() };
-               final Object defaultValue = new Object();
-
-               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(2, defaultValue);
-               for (Object o : originals) {
-                       original.add(o);
-               }
-
-               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
-
-               assertEquals(originals.length, transformed.size());
-               assertEquals(original.size(), transformed.size());
-               assertEquals(original.getSizeLimit(), 
transformed.getSizeLimit());
-               assertEquals(defaultValue, 
transformed.getDefaultElement().original);
-
-               for (int i = 0; i < originals.length; i++) {
-                       if (i < originals.length - transformed.getSizeLimit()) {
-                               assertEquals(transformed.getDefaultElement(), 
transformed.get(i));
-                       } else {
-                               assertEquals(originals[i], 
transformed.get(i).original);
-                       }
-               }
-
-               try {
-                       transformed.get(originals.length);
-                       fail("should have failed with an exception");
-               } catch (IndexOutOfBoundsException e) {
-                       // expected
-               }
-       }
-
-       @Test
-       public void testMapWithNullDefault() {
-               final EvictingBoundedList<Object> original = new 
EvictingBoundedList<>(5, null);
-               final EvictingBoundedList<TransformedObject> transformed = 
original.map(new Mapper());
-
-               assertEquals(original.size(), transformed.size());
-               assertNull(transformed.getDefaultElement());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static final class TransformedObject {
-
-               final Object original;
-
-               TransformedObject(Object original) {
-                       this.original = checkNotNull(original);
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static final class Mapper implements 
EvictingBoundedList.Function<Object, TransformedObject> {
-
-               @Override
-               public TransformedObject apply(Object value) {
-                       return new TransformedObject(value);
-               }
-       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Keep only archived version of previous executions
> -------------------------------------------------
>
>                 Key: FLINK-10066
>                 URL: https://issues.apache.org/jira/browse/FLINK-10066
>             Project: Flink
>          Issue Type: Improvement
>          Components: JobManager
>    Affects Versions: 1.4.3, 1.5.2, 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, the execution vertex stores a limited amount of previous 
> executions in a bounded list. This happens primarily for archiving purposes 
> and to remember previous locations and allocation ids. We remember the whole 
> execution to eventually convert it into an archived execution.
> This seems unnecessary and dangerous as we have observed that this strategy 
> is prone to memory leaks in the job manager. With a very high vertex count or 
> parallelism, remembering complete executions can become very memory 
> intensive. Instead I suggest to eagerly transform the executions into the 
> archived version before adding them to the list, i.e. only the archived 
> version is ever still referenced after the execution becomes obsolete. This 
> gives better control over which information about the execution should really 
> be kept in memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to