asfgit closed pull request #6500: [FLINK-10066] Keep only archived version of previous executions URL: https://github.com/apache/flink/pull/6500
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java index 4b1c62fe707..ab8c94c0188 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; @@ -40,6 +41,8 @@ private final TaskManagerLocation assignedResourceLocation; // for the archived execution + private final AllocationID assignedAllocationID; + /* Continuously updated map of user-defined accumulators */ private final StringifiedAccumulatorResult[] userAccumulators; @@ -48,21 +51,24 @@ private final IOMetrics ioMetrics; public ArchivedExecution(Execution execution) { - this.userAccumulators = execution.getUserAccumulatorsStringified(); - this.attemptId = execution.getAttemptId(); - this.attemptNumber = execution.getAttemptNumber(); - this.stateTimestamps = execution.getStateTimestamps(); - this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex(); - this.state = execution.getState(); - this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause()); - this.assignedResourceLocation = execution.getAssignedResourceLocation(); - this.ioMetrics = execution.getIOMetrics(); + this( + execution.getUserAccumulatorsStringified(), + execution.getIOMetrics(), + execution.getAttemptId(), + execution.getAttemptNumber(), + execution.getState(), + ExceptionUtils.stringifyException(execution.getFailureCause()), + execution.getAssignedResourceLocation(), + execution.getAssignedAllocationID(), + execution.getVertex().getParallelSubtaskIndex(), + execution.getStateTimestamps()); } public ArchivedExecution( StringifiedAccumulatorResult[] userAccumulators, IOMetrics ioMetrics, ExecutionAttemptID attemptId, int attemptNumber, ExecutionState state, String failureCause, - TaskManagerLocation assignedResourceLocation, int parallelSubtaskIndex, long[] stateTimestamps) { + TaskManagerLocation assignedResourceLocation, AllocationID assignedAllocationID, int parallelSubtaskIndex, + long[] stateTimestamps) { this.userAccumulators = userAccumulators; this.ioMetrics = ioMetrics; this.failureCause = failureCause; @@ -72,6 +78,7 @@ public ArchivedExecution( this.state = state; this.stateTimestamps = stateTimestamps; this.parallelSubtaskIndex = parallelSubtaskIndex; + this.assignedAllocationID = assignedAllocationID; } // -------------------------------------------------------------------------------------------- @@ -103,6 +110,10 @@ public TaskManagerLocation getAssignedResourceLocation() { return assignedResourceLocation; } + public AllocationID getAssignedAllocationID() { + return assignedAllocationID; + } + @Override public String getFailureCauseAsString() { return failureCause; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java index 36669d34be7..04efa048fb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java @@ -40,7 +40,7 @@ public ArchivedExecutionVertex(ExecutionVertex vertex) { this.subTaskIndex = vertex.getParallelSubtaskIndex(); - this.priorExecutions = vertex.getCopyOfPriorExecutionsList().map(ARCHIVER); + this.priorExecutions = vertex.getCopyOfPriorExecutionsList(); this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex(); this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); } @@ -101,17 +101,4 @@ public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) { throw new IllegalArgumentException("attempt does not exist"); } } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private static final EvictingBoundedList.Function<Execution, ArchivedExecution> ARCHIVER = - new EvictingBoundedList.Function<Execution, ArchivedExecution>() { - - @Override - public ArchivedExecution apply(Execution value) { - return value.archive(); - } - }; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e385318b810..e4228011830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -92,7 +92,7 @@ private final int subTaskIndex; - private final EvictingBoundedList<Execution> priorExecutions; + private final EvictingBoundedList<ArchivedExecution> priorExecutions; private final Time timeout; @@ -287,7 +287,7 @@ public TaskManagerLocation getCurrentAssignedResourceLocation() { } @Override - public Execution getPriorExecutionAttempt(int attemptNumber) { + public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) { synchronized (priorExecutions) { if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) { return priorExecutions.get(attemptNumber); @@ -297,7 +297,7 @@ public Execution getPriorExecutionAttempt(int attemptNumber) { } } - public Execution getLatestPriorExecution() { + public ArchivedExecution getLatestPriorExecution() { synchronized (priorExecutions) { final int size = priorExecutions.size(); if (size > 0) { @@ -316,16 +316,16 @@ public Execution getLatestPriorExecution() { * @return The latest prior execution location, or null, if there is none, yet. */ public TaskManagerLocation getLatestPriorLocation() { - Execution latestPriorExecution = getLatestPriorExecution(); + ArchivedExecution latestPriorExecution = getLatestPriorExecution(); return latestPriorExecution != null ? latestPriorExecution.getAssignedResourceLocation() : null; } public AllocationID getLatestPriorAllocation() { - Execution latestPriorExecution = getLatestPriorExecution(); + ArchivedExecution latestPriorExecution = getLatestPriorExecution(); return latestPriorExecution != null ? latestPriorExecution.getAssignedAllocationID() : null; } - EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() { + EvictingBoundedList<ArchivedExecution> getCopyOfPriorExecutionsList() { synchronized (priorExecutions) { return new EvictingBoundedList<>(priorExecutions); } @@ -577,7 +577,7 @@ public Execution resetForNewExecution(final long timestamp, final long originati final ExecutionState oldState = oldExecution.getState(); if (oldState.isTerminal()) { - priorExecutions.add(oldExecution); + priorExecutions.add(oldExecution.archive()); final Execution newExecution = new Execution( getExecutionGraph().getFutureExecutor(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java index 2c5b6a9fbcb..9a529f3d40b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EvictingBoundedList.java @@ -20,6 +20,8 @@ import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.io.Serializable; import java.util.ConcurrentModificationException; import java.util.Iterator; @@ -146,6 +148,7 @@ private T accessInternal(int arrayIndex) { return (T) elements[arrayIndex]; } + @Nonnull @Override public Iterator<T> iterator() { return new Iterator<T>() { @@ -176,50 +179,4 @@ public void remove() { } }; } - - /** - * Creates a new list that replaces its elements with transformed elements. - * The list retains the same size and position-to-element mapping. - * - * <p>Note that null values are automatically mapped to null values. - * - * @param transform The function used to transform each element - * @param <R> The type of the elements in the result list. - * - * @return The list with the mapped elements - */ - public <R> EvictingBoundedList<R> map(Function<T, R> transform) { - // map the default element - final R newDefault = defaultElement == null ? null : transform.apply(defaultElement); - - // copy the list with the new default - final EvictingBoundedList<R> result = new EvictingBoundedList<>(elements.length, newDefault); - result.count = count; - result.idx = idx; - - // map all the entries in the list - final int numElements = Math.min(elements.length, count); - for (int i = 0; i < numElements; i++) { - result.elements[i] = transform.apply(accessInternal(i)); - } - - return result; - } - - // ------------------------------------------------------------------------ - - /** - * A simple unary function that can be used to transform elements via the - * {@link EvictingBoundedList#map(Function)} method. - */ - public interface Function<I, O> { - - /** - * Transforms the value. - * - * @param value The value to transform - * @return The transformed value - */ - O apply(I value); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index af8b995a3e6..99dcbf76305 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecution; import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; @@ -93,6 +94,7 @@ public void testHandleRequest() throws Exception { timestamps[expectedState.ordinal()] = finishedTs; final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation(); + final AllocationID allocationID = new AllocationID(); final int subtaskIndex = 1; final int attempt = 2; @@ -104,6 +106,7 @@ public void testHandleRequest() throws Exception { expectedState, null, assignedResourceLocation, + allocationID, subtaskIndex, timestamps); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index df4ff04bf74..1099e4e71f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -99,6 +99,7 @@ public void testHandleRequest() throws Exception { ExecutionState.FINISHED, null, null, + null, subtaskIndex, new long[ExecutionState.values().length]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 8e44c0e9731..22a7d774e8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -104,6 +104,7 @@ public void testHandleRequest() throws Exception { expectedState, null, null, + null, subtaskIndex, new long[ExecutionState.values().length]), new EvictingBoundedList<>(0) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java index ad5cd6be739..38c1d7ed982 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionBuilder.java @@ -21,6 +21,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MeterView; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecution; @@ -42,6 +43,7 @@ private ExecutionState state; private String failureCause; private TaskManagerLocation assignedResourceLocation; + private AllocationID assignedAllocationID; private StringifiedAccumulatorResult[] userAccumulators; private IOMetrics ioMetrics; private int parallelSubtaskIndex; @@ -77,6 +79,11 @@ public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation return this; } + public ArchivedExecutionBuilder setAssignedAllocationID(AllocationID assignedAllocationID) { + this.assignedAllocationID = assignedAllocationID; + return this; + } + public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) { this.userAccumulators = userAccumulators; return this; @@ -101,6 +108,7 @@ public ArchivedExecution build() throws UnknownHostException { state != null ? state : ExecutionState.FINISHED, failureCause != null ? failureCause : "(null)", assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234), + assignedAllocationID != null ? assignedAllocationID : new AllocationID(0L, 0L), parallelSubtaskIndex, stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5} ); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java index 92b0d8acee3..caf0fed2ae7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; @@ -108,11 +109,13 @@ private static void generateArchivedJob() throws Exception { StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1"); StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2"); TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234); + AllocationID allocationID = new AllocationID(42L, 43L); originalAttempt = new ArchivedExecutionBuilder() .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) .setParallelSubtaskIndex(1) .setAttemptNumber(0) .setAssignedResourceLocation(location) + .setAssignedAllocationID(allocationID) .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) .setState(ExecutionState.FINISHED) .setFailureCause("attemptException") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java index 7109dacb20d..40bd0216153 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EvictingBoundedListTest.java @@ -28,12 +28,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.apache.flink.util.Preconditions.checkNotNull; - +/** + * Tests for {@link EvictingBoundedList}. + */ public class EvictingBoundedListTest { @Test @@ -164,96 +164,4 @@ public void testIterator() { } } - - @Test - public void testMapWithHalfFullList() { - final Object[] originals = { new Object(), new Object(), new Object() }; - final Object defaultValue = new Object(); - - final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, defaultValue); - for (Object o : originals) { - original.add(o); - } - - final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); - - assertEquals(original.size(), transformed.size()); - assertEquals(original.getSizeLimit(), transformed.getSizeLimit()); - assertEquals(defaultValue, transformed.getDefaultElement().original); - - int i = 0; - for (TransformedObject to : transformed) { - assertEquals(originals[i++], to.original); - } - - try { - transformed.get(originals.length); - fail("should have failed with an exception"); - } catch (IndexOutOfBoundsException e) { - // expected - } - } - - @Test - public void testMapWithEvictedElements() { - final Object[] originals = { new Object(), new Object(), new Object(), new Object(), new Object() }; - final Object defaultValue = new Object(); - - final EvictingBoundedList<Object> original = new EvictingBoundedList<>(2, defaultValue); - for (Object o : originals) { - original.add(o); - } - - final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); - - assertEquals(originals.length, transformed.size()); - assertEquals(original.size(), transformed.size()); - assertEquals(original.getSizeLimit(), transformed.getSizeLimit()); - assertEquals(defaultValue, transformed.getDefaultElement().original); - - for (int i = 0; i < originals.length; i++) { - if (i < originals.length - transformed.getSizeLimit()) { - assertEquals(transformed.getDefaultElement(), transformed.get(i)); - } else { - assertEquals(originals[i], transformed.get(i).original); - } - } - - try { - transformed.get(originals.length); - fail("should have failed with an exception"); - } catch (IndexOutOfBoundsException e) { - // expected - } - } - - @Test - public void testMapWithNullDefault() { - final EvictingBoundedList<Object> original = new EvictingBoundedList<>(5, null); - final EvictingBoundedList<TransformedObject> transformed = original.map(new Mapper()); - - assertEquals(original.size(), transformed.size()); - assertNull(transformed.getDefaultElement()); - } - - // ------------------------------------------------------------------------ - - private static final class TransformedObject { - - final Object original; - - TransformedObject(Object original) { - this.original = checkNotNull(original); - } - } - - // ------------------------------------------------------------------------ - - private static final class Mapper implements EvictingBoundedList.Function<Object, TransformedObject> { - - @Override - public TransformedObject apply(Object value) { - return new TransformedObject(value); - } - } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services