asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of
previous executions
URL: https://github.com/apache/flink/pull/6500
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 4b1c62fe707..ab8c94c0188 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
@@ -40,6 +41,8 @@
private final TaskManagerLocation assignedResourceLocation; // for the
archived execution
+ private final AllocationID assignedAllocationID;
+
/* Continuously updated map of user-defined accumulators */
private final StringifiedAccumulatorResult[] userAccumulators;
@@ -48,21 +51,24 @@
private final IOMetrics ioMetrics;
public ArchivedExecution(Execution execution) {
- this.userAccumulators =
execution.getUserAccumulatorsStringified();
- this.attemptId = execution.getAttemptId();
- this.attemptNumber = execution.getAttemptNumber();
- this.stateTimestamps = execution.getStateTimestamps();
- this.parallelSubtaskIndex =
execution.getVertex().getParallelSubtaskIndex();
- this.state = execution.getState();
- this.failureCause =
ExceptionUtils.stringifyException(execution.getFailureCause());
- this.assignedResourceLocation =
execution.getAssignedResourceLocation();
- this.ioMetrics = execution.getIOMetrics();
+ this(
+ execution.getUserAccumulatorsStringified(),
+ execution.getIOMetrics(),
+ execution.getAttemptId(),
+ execution.getAttemptNumber(),
+ execution.getState(),
+
ExceptionUtils.stringifyException(execution.getFailureCause()),
+ execution.getAssignedResourceLocation(),
+ execution.getAssignedAllocationID(),
+ execution.getVertex().getParallelSubtaskIndex(),
+ execution.getStateTimestamps());
}
public ArchivedExecution(
StringifiedAccumulatorResult[] userAccumulators,
IOMetrics ioMetrics,
ExecutionAttemptID attemptId, int attemptNumber,
ExecutionState state, String failureCause,
- TaskManagerLocation assignedResourceLocation, int
parallelSubtaskIndex, long[] stateTimestamps) {
+ TaskManagerLocation assignedResourceLocation,
AllocationID assignedAllocationID, int parallelSubtaskIndex,
+ long[] stateTimestamps) {
this.userAccumulators = userAccumulators;
this.ioMetrics = ioMetrics;
this.failureCause = failureCause;
@@ -72,6 +78,7 @@ public ArchivedExecution(
this.state = state;
this.stateTimestamps = stateTimestamps;
this.parallelSubtaskIndex = parallelSubtaskIndex;
+ this.assignedAllocationID = assignedAllocationID;
}
//
--------------------------------------------------------------------------------------------
@@ -103,6 +110,10 @@ public TaskManagerLocation getAssignedResourceLocation() {
return assignedResourceLocation;
}
+ public AllocationID getAssignedAllocationID() {
+ return assignedAllocationID;
+ }
+
@Override
public String getFailureCauseAsString() {
return failureCause;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 36669d34be7..04efa048fb6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -40,7 +40,7 @@
public ArchivedExecutionVertex(ExecutionVertex vertex) {
this.subTaskIndex = vertex.getParallelSubtaskIndex();
- this.priorExecutions =
vertex.getCopyOfPriorExecutionsList().map(ARCHIVER);
+ this.priorExecutions = vertex.getCopyOfPriorExecutionsList();
this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
this.currentExecution =
vertex.getCurrentExecutionAttempt().archive();
}
@@ -101,17 +101,4 @@ public ArchivedExecution getPriorExecutionAttempt(int
attemptNumber) {
throw new IllegalArgumentException("attempt does not
exist");
}
}
-
- //
------------------------------------------------------------------------
- // utilities
- //
------------------------------------------------------------------------
-
- private static final EvictingBoundedList.Function<Execution,
ArchivedExecution> ARCHIVER =
- new EvictingBoundedList.Function<Execution,
ArchivedExecution>() {
-
- @Override
- public ArchivedExecution apply(Execution value) {
- return value.archive();
- }
- };
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e385318b810..e4228011830 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -92,7 +92,7 @@
private final int subTaskIndex;
- private final EvictingBoundedList<Execution> priorExecutions;
+ private final EvictingBoundedList<ArchivedExecution> priorExecutions;
private final Time timeout;
@@ -287,7 +287,7 @@ public TaskManagerLocation
getCurrentAssignedResourceLocation() {
}
@Override
- public Execution getPriorExecutionAttempt(int attemptNumber) {
+ public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
synchronized (priorExecutions) {
if (attemptNumber >= 0 && attemptNumber <
priorExecutions.size()) {
return priorExecutions.get(attemptNumber);
@@ -297,7 +297,7 @@ public Execution getPriorExecutionAttempt(int
attemptNumber) {
}
}
- public Execution getLatestPriorExecution() {
+ public ArchivedExecution getLatestPriorExecution() {
synchronized (priorExecutions) {
final int size = priorExecutions.size();
if (size > 0) {
@@ -316,16 +316,16 @@ public Execution getLatestPriorExecution() {
* @return The latest prior execution location, or null, if there is
none, yet.
*/
public TaskManagerLocation getLatestPriorLocation() {
- Execution latestPriorExecution = getLatestPriorExecution();
+ ArchivedExecution latestPriorExecution =
getLatestPriorExecution();
return latestPriorExecution != null ?
latestPriorExecution.getAssignedResourceLocation() : null;
}
public AllocationID getLatestPriorAllocation() {
- Execution latestPriorExecution = getLatestPriorExecution();
+ ArchivedExecution latestPriorExecution =
getLatestPriorExecution();
return latestPriorExecution != null ?
latestPriorExecution.getAssignedAllocationID() : null;
}
- EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
+ EvictingBoundedList<ArchivedExecution> getCopyOfPriorExecutionsList() {
synchronized (priorExecutions) {
return new EvictingBoundedList<>(priorExecutions);
}
@@ -577,7 +577,7 @@ public Execution resetForNewExecution(final long timestamp,
final long originati
final ExecutionState oldState = oldExecution.getState();
if (oldState.isTerminal()) {
- priorExecutions.add(oldExecution);
+ priorExecutions.add(oldExecution.archive());
final Execution newExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
index 2c5b6a9fbcb..9a529f3d40b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java
@@ -20,6 +20,8 @@
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nonnull;
+
import java.io.Serializable;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
@@ -146,6 +148,7 @@ private T accessInternal(int arrayIndex) {
return (T) elements[arrayIndex];
}
+ @Nonnull
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@@ -176,50 +179,4 @@ public void remove() {
}
};
}
-
- /**
- * Creates a new list that replaces its elements with transformed
elements.
- * The list retains the same size and position-to-element mapping.
- *
- * <p>Note that null values are automatically mapped to null values.
- *
- * @param transform The function used to transform each element
- * @param <R> The type of the elements in the result list.
- *
- * @return The list with the mapped elements
- */
- public <R> EvictingBoundedList<R> map(Function<T, R> transform) {
- // map the default element
- final R newDefault = defaultElement == null ? null :
transform.apply(defaultElement);
-
- // copy the list with the new default
- final EvictingBoundedList<R> result = new
EvictingBoundedList<>(elements.length, newDefault);
- result.count = count;
- result.idx = idx;
-
- // map all the entries in the list
- final int numElements = Math.min(elements.length, count);
- for (int i = 0; i < numElements; i++) {
- result.elements[i] = transform.apply(accessInternal(i));
- }
-
- return result;
- }
-
- //
------------------------------------------------------------------------
-
- /**
- * A simple unary function that can be used to transform elements via
the
- * {@link EvictingBoundedList#map(Function)} method.
- */
- public interface Function<I, O> {
-
- /**
- * Transforms the value.
- *
- * @param value The value to transform
- * @return The transformed value
- */
- O apply(I value);
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
index af8b995a3e6..99dcbf76305 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
@@ -93,6 +94,7 @@ public void testHandleRequest() throws Exception {
timestamps[expectedState.ordinal()] = finishedTs;
final LocalTaskManagerLocation assignedResourceLocation = new
LocalTaskManagerLocation();
+ final AllocationID allocationID = new AllocationID();
final int subtaskIndex = 1;
final int attempt = 2;
@@ -104,6 +106,7 @@ public void testHandleRequest() throws Exception {
expectedState,
null,
assignedResourceLocation,
+ allocationID,
subtaskIndex,
timestamps);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index df4ff04bf74..1099e4e71f9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -99,6 +99,7 @@ public void testHandleRequest() throws Exception {
ExecutionState.FINISHED,
null,
null,
+ null,
subtaskIndex,
new long[ExecutionState.values().length]);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index 8e44c0e9731..22a7d774e8c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -104,6 +104,7 @@ public void testHandleRequest() throws Exception {
expectedState,
null,
null,
+ null,
subtaskIndex,
new
long[ExecutionState.values().length]),
new EvictingBoundedList<>(0)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
index ad5cd6be739..38c1d7ed982 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java
@@ -21,6 +21,7 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
@@ -42,6 +43,7 @@
private ExecutionState state;
private String failureCause;
private TaskManagerLocation assignedResourceLocation;
+ private AllocationID assignedAllocationID;
private StringifiedAccumulatorResult[] userAccumulators;
private IOMetrics ioMetrics;
private int parallelSubtaskIndex;
@@ -77,6 +79,11 @@ public ArchivedExecutionBuilder
setAssignedResourceLocation(TaskManagerLocation
return this;
}
+ public ArchivedExecutionBuilder setAssignedAllocationID(AllocationID
assignedAllocationID) {
+ this.assignedAllocationID = assignedAllocationID;
+ return this;
+ }
+
public ArchivedExecutionBuilder
setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
this.userAccumulators = userAccumulators;
return this;
@@ -101,6 +108,7 @@ public ArchivedExecution build() throws
UnknownHostException {
state != null ? state : ExecutionState.FINISHED,
failureCause != null ? failureCause : "(null)",
assignedResourceLocation != null ?
assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"),
InetAddress.getLocalHost(), 1234),
+ assignedAllocationID != null ? assignedAllocationID :
new AllocationID(0L, 0L),
parallelSubtaskIndex,
stateTimestamps != null ? stateTimestamps : new
long[]{1, 2, 3, 4, 5, 5, 5, 5}
);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
index 92b0d8acee3..caf0fed2ae7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
@@ -108,11 +109,13 @@ private static void generateArchivedJob() throws
Exception {
StringifiedAccumulatorResult acc1 = new
StringifiedAccumulatorResult("name1", "type1", "value1");
StringifiedAccumulatorResult acc2 = new
StringifiedAccumulatorResult("name2", "type2", "value2");
TaskManagerLocation location = new TaskManagerLocation(new
ResourceID("hello"), InetAddress.getLocalHost(), 1234);
+ AllocationID allocationID = new AllocationID(42L, 43L);
originalAttempt = new ArchivedExecutionBuilder()
.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8,
9})
.setParallelSubtaskIndex(1)
.setAttemptNumber(0)
.setAssignedResourceLocation(location)
+ .setAssignedAllocationID(allocationID)
.setUserAccumulators(new
StringifiedAccumulatorResult[]{acc1, acc2})
.setState(ExecutionState.FINISHED)
.setFailureCause("attemptException")
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
index 7109dacb20d..40bd0216153 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java
@@ -28,12 +28,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
+/**
+ * Tests for {@link EvictingBoundedList}.
+ */
public class EvictingBoundedListTest {
@Test
@@ -164,96 +164,4 @@ public void testIterator() {
}
}
-
- @Test
- public void testMapWithHalfFullList() {
- final Object[] originals = { new Object(), new Object(), new
Object() };
- final Object defaultValue = new Object();
-
- final EvictingBoundedList<Object> original = new
EvictingBoundedList<>(5, defaultValue);
- for (Object o : originals) {
- original.add(o);
- }
-
- final EvictingBoundedList<TransformedObject> transformed =
original.map(new Mapper());
-
- assertEquals(original.size(), transformed.size());
- assertEquals(original.getSizeLimit(),
transformed.getSizeLimit());
- assertEquals(defaultValue,
transformed.getDefaultElement().original);
-
- int i = 0;
- for (TransformedObject to : transformed) {
- assertEquals(originals[i++], to.original);
- }
-
- try {
- transformed.get(originals.length);
- fail("should have failed with an exception");
- } catch (IndexOutOfBoundsException e) {
- // expected
- }
- }
-
- @Test
- public void testMapWithEvictedElements() {
- final Object[] originals = { new Object(), new Object(), new
Object(), new Object(), new Object() };
- final Object defaultValue = new Object();
-
- final EvictingBoundedList<Object> original = new
EvictingBoundedList<>(2, defaultValue);
- for (Object o : originals) {
- original.add(o);
- }
-
- final EvictingBoundedList<TransformedObject> transformed =
original.map(new Mapper());
-
- assertEquals(originals.length, transformed.size());
- assertEquals(original.size(), transformed.size());
- assertEquals(original.getSizeLimit(),
transformed.getSizeLimit());
- assertEquals(defaultValue,
transformed.getDefaultElement().original);
-
- for (int i = 0; i < originals.length; i++) {
- if (i < originals.length - transformed.getSizeLimit()) {
- assertEquals(transformed.getDefaultElement(),
transformed.get(i));
- } else {
- assertEquals(originals[i],
transformed.get(i).original);
- }
- }
-
- try {
- transformed.get(originals.length);
- fail("should have failed with an exception");
- } catch (IndexOutOfBoundsException e) {
- // expected
- }
- }
-
- @Test
- public void testMapWithNullDefault() {
- final EvictingBoundedList<Object> original = new
EvictingBoundedList<>(5, null);
- final EvictingBoundedList<TransformedObject> transformed =
original.map(new Mapper());
-
- assertEquals(original.size(), transformed.size());
- assertNull(transformed.getDefaultElement());
- }
-
- //
------------------------------------------------------------------------
-
- private static final class TransformedObject {
-
- final Object original;
-
- TransformedObject(Object original) {
- this.original = checkNotNull(original);
- }
- }
-
- //
------------------------------------------------------------------------
-
- private static final class Mapper implements
EvictingBoundedList.Function<Object, TransformedObject> {
-
- @Override
- public TransformedObject apply(Object value) {
- return new TransformedObject(value);
- }
- }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services