[
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614802#comment-16614802
]
ASF GitHub Bot commented on FLINK-10255:
----------------------------------------
asfgit closed pull request #6678: [FLINK-10255] Only react to onAddedJobGraph
signal when being leader
URL: https://github.com/apache/flink/pull/6678
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924b448..d8ad5aba8ea 100644
---
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Properties;
@@ -36,8 +37,8 @@
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
- public StandaloneJobClusterConfiguration(@Nonnull String configDir,
@Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort,
@Nonnull String jobClassName, @Nonnull SavepointRestoreSettings
savepointRestoreSettings) {
- super(configDir, dynamicProperties, args, restPort);
+ public StandaloneJobClusterConfiguration(@Nonnull String configDir,
@Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String
hostname, int restPort, @Nonnull String jobClassName, @Nonnull
SavepointRestoreSettings savepointRestoreSettings) {
+ super(configDir, dynamicProperties, args, hostname, restPort);
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
}
diff --git
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba864ed..17217eff018 100644
---
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
/**
@@ -67,6 +68,7 @@ public StandaloneJobClusterConfiguration
createResult(@Nonnull CommandLine comma
final Properties dynamicProperties =
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortString =
commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortString);
+ final String hostname =
commandLine.getOptionValue(HOST_OPTION.getOpt());
final String jobClassName =
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings =
CliFrontendParser.createSavepointRestoreSettings(commandLine);
@@ -74,6 +76,7 @@ public StandaloneJobClusterConfiguration
createResult(@Nonnull CommandLine comma
configDir,
dynamicProperties,
commandLine.getArgs(),
+ hostname,
restPort,
jobClassName,
savepointRestoreSettings);
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a985d..6fc5b76f246 100644
---
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@
* @param <E> type of the thrown exception
*/
@FunctionalInterface
-public interface BiConsumerWithException<T, U, E extends Throwable> extends
BiConsumer<T, U> {
+public interface BiConsumerWithException<T, U, E extends Throwable> {
/**
* Performs this operation on the given arguments.
@@ -39,14 +39,23 @@
* @param u the second input argument
* @throws E in case of an error
*/
- void acceptWithException(T t, U u) throws E;
+ void accept(T t, U u) throws E;
- @Override
- default void accept(T t, U u) {
- try {
- acceptWithException(t, u);
- } catch (Throwable e) {
- ExceptionUtils.rethrow(e);
- }
+ /**
+ * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+ *
+ * @param biConsumerWithException BiConsumer with exception to convert
into a {@link BiConsumer}.
+ * @param <A> first input type
+ * @param <B> second input type
+ * @return {@link BiConsumer} which rethrows all checked exceptions as
unchecked.
+ */
+ static <A, B> BiConsumer<A, B> unchecked(BiConsumerWithException<A, B,
?> biConsumerWithException) {
+ return (A a, B b) -> {
+ try {
+ biConsumerWithException.accept(a, b);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
index 967c737e584..ccba8a7e774 100644
---
a/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
+++
b/flink-core/src/main/java/org/apache/flink/util/function/BiFunctionWithException.java
@@ -31,7 +31,7 @@
* @param <E> type of the exception which can be thrown
*/
@FunctionalInterface
-public interface BiFunctionWithException<T, U, R, E extends Throwable> extends
BiFunction<T, U, R> {
+public interface BiFunctionWithException<T, U, R, E extends Throwable> {
/**
* Apply the given values t and u to obtain the resulting value. The
operation can
@@ -42,16 +42,25 @@
* @return result value
* @throws E if the operation fails
*/
- R applyWithException(T t, U u) throws E;
+ R apply(T t, U u) throws E;
- default R apply(T t, U u) {
- try {
- return applyWithException(t, u);
- } catch (Throwable e) {
- ExceptionUtils.rethrow(e);
- // we have to return a value to please the compiler
- // but we will never reach the code here
- return null;
- }
+ /**
+ * Convert at {@link BiFunctionWithException} into a {@link BiFunction}.
+ *
+ * @param biFunctionWithException function with exception to convert
into a function
+ * @param <A> input type
+ * @param <B> output type
+ * @return {@link BiFunction} which throws all checked exception as an
unchecked exception.
+ */
+ static <A, B, C> BiFunction<A, B, C>
unchecked(BiFunctionWithException<A, B, C, ?> biFunctionWithException) {
+ return (A a, B b) -> {
+ try {
+ return biFunctionWithException.apply(a, b);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ // we need this to appease the compiler :-(
+ return null;
+ }
+ };
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
b/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
deleted file mode 100644
index 09507d4e9f2..00000000000
---
a/flink-core/src/main/java/org/apache/flink/util/function/ConsumerWithException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util.function;
-
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.function.Consumer;
-
-/**
- * A checked extension of the {@link Consumer} interface.
- *
- * @param <T> type of the first argument
- * @param <E> type of the thrown exception
- */
-public interface ConsumerWithException<T, E extends Throwable> extends
Consumer<T> {
-
- void acceptWithException(T value) throws E;
-
- @Override
- default void accept(T value) {
- try {
- acceptWithException(value);
- } catch (Throwable t) {
- ExceptionUtils.rethrow(t);
- }
- }
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
new file mode 100644
index 00000000000..678ef9f78b6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Utility class for Flink's functions.
+ */
+public class FunctionUtils {
+
+ private FunctionUtils() {
+ throw new UnsupportedOperationException("This class should
never be instantiated.");
+ }
+
+ /**
+ * Convert at {@link FunctionWithException} into a {@link Function}.
+ *
+ * @param functionWithException function with exception to convert into
a function
+ * @param <A> input type
+ * @param <B> output type
+ * @return {@link Function} which throws all checked exception as an
unchecked exception.
+ */
+ public static <A, B> Function<A, B>
uncheckedFunction(FunctionWithException<A, B, ?> functionWithException) {
+ return (A value) -> {
+ try {
+ return functionWithException.apply(value);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ // we need this to appease the compiler :-(
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Converts a {@link ThrowingConsumer} into a {@link Consumer} which
throws checked exceptions
+ * as unchecked.
+ *
+ * @param throwingConsumer to convert into a {@link Consumer}
+ * @param <A> input type
+ * @return {@link Consumer} which throws all checked exceptions as
unchecked
+ */
+ public static <A> Consumer<A> uncheckedConsumer(ThrowingConsumer<A, ?>
throwingConsumer) {
+ return (A value) -> {
+ try {
+ throwingConsumer.accept(value);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
index 4fef4207838..0dd4047a1e5 100644
---
a/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
+++
b/flink-core/src/main/java/org/apache/flink/util/function/ThrowingRunnable.java
@@ -19,6 +19,7 @@
package org.apache.flink.util.function;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
/**
* Similar to a {@link Runnable}, this interface is used to capture a block of
code
@@ -35,4 +36,21 @@
* @throws E Exceptions may be thrown.
*/
void run() throws E;
+
+ /**
+ * Converts a {@link ThrowingRunnable} into a {@link Runnable} which
throws all checked exceptions
+ * as unchecked.
+ *
+ * @param throwingRunnable to convert into a {@link Runnable}
+ * @return {@link Runnable} which throws all checked exceptions as
unchecked.
+ */
+ static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+ return () -> {
+ try {
+ throwingRunnable.run();
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 131733924ae..e443fc21552 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,7 +25,7 @@
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
@@ -246,7 +246,7 @@ public void addCheckpoint(final CompletedCheckpoint
checkpoint) throws Exception
LOG.debug("Added {} to {}.", checkpoint, path);
}
- private void tryRemoveCompletedCheckpoint(CompletedCheckpoint
completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception>
discardCallback) {
+ private void tryRemoveCompletedCheckpoint(CompletedCheckpoint
completedCheckpoint, ThrowingConsumer<CompletedCheckpoint, Exception>
discardCallback) {
try {
if (tryRemove(completedCheckpoint.getCheckpointID())) {
executor.execute(() -> {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c31e64c0adc..5279e502a93 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -60,11 +60,13 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.ConsumerWithException;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -128,6 +130,8 @@
private final Map<JobID, CompletableFuture<Void>>
jobManagerTerminationFutures;
+ private CompletableFuture<Void> recoveryOperation =
CompletableFuture.completedFuture(null);
+
public Dispatcher(
RpcService rpcService,
String endpointId,
@@ -629,31 +633,51 @@ private void terminateJobManagerRunners() {
* Recovers all jobs persisted via the submitted job graph store.
*/
@VisibleForTesting
- CompletableFuture<Collection<JobGraph>> recoverJobs() {
+ Collection<JobGraph> recoverJobs() throws Exception {
log.info("Recovering all persisted jobs.");
- return FutureUtils.supplyAsync(
- () -> {
- final Collection<JobID> jobIds =
submittedJobGraphStore.getJobIds();
-
- final List<JobGraph> jobGraphs = new
ArrayList<>(jobIds.size());
+ final Collection<JobID> jobIds =
submittedJobGraphStore.getJobIds();
- for (JobID jobId : jobIds) {
- jobGraphs.add(recoverJob(jobId));
+ try {
+ return recoverJobGraphs(jobIds);
+ } catch (Exception e) {
+ // release all recovered job graphs
+ for (JobID jobId : jobIds) {
+ try {
+
submittedJobGraphStore.releaseJobGraph(jobId);
+ } catch (Exception ie) {
+ e.addSuppressed(ie);
}
+ }
+ throw e;
+ }
+ }
- return jobGraphs;
- },
- getRpcService().getExecutor());
+ @Nonnull
+ private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds)
throws Exception {
+ final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
+
+ for (JobID jobId : jobIds) {
+ final JobGraph jobGraph = recoverJob(jobId);
+
+ if (jobGraph == null) {
+ throw new FlinkJobNotFoundException(jobId);
+ }
+
+ jobGraphs.add(jobGraph);
+ }
+
+ return jobGraphs;
}
+ @Nullable
private JobGraph recoverJob(JobID jobId) throws Exception {
log.debug("Recover job {}.", jobId);
- SubmittedJobGraph submittedJobGraph =
submittedJobGraphStore.recoverJobGraph(jobId);
+ final SubmittedJobGraph submittedJobGraph =
submittedJobGraphStore.recoverJobGraph(jobId);
if (submittedJobGraph != null) {
return submittedJobGraph.getJobGraph();
} else {
- throw new FlinkJobNotFoundException(jobId);
+ return null;
}
}
@@ -768,27 +792,40 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
- log.info("Dispatcher {} was granted leadership with fencing
token {}", getAddress(), newLeaderSessionID);
+ runAsyncWithoutFencing(
+ () -> {
+ log.info("Dispatcher {} was granted leadership
with fencing token {}", getAddress(), newLeaderSessionID);
- final CompletableFuture<Collection<JobGraph>>
recoveredJobsFuture = recoverJobs();
+ final CompletableFuture<Collection<JobGraph>>
recoveredJobsFuture = recoveryOperation.thenApplyAsync(
+ FunctionUtils.uncheckedFunction(ignored
-> recoverJobs()),
+ getRpcService().getExecutor());
- final CompletableFuture<Boolean> fencingTokenFuture =
recoveredJobsFuture.thenComposeAsync(
- (Collection<JobGraph> recoveredJobs) ->
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
- getUnfencedMainThreadExecutor());
+ final CompletableFuture<Boolean>
fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
+ (Collection<JobGraph> recoveredJobs) ->
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
+ getUnfencedMainThreadExecutor());
- final CompletableFuture<Void> confirmationFuture =
fencingTokenFuture.thenAcceptAsync(
- (Boolean confirmLeadership) -> {
- if (confirmLeadership) {
-
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
- }
- },
- getRpcService().getExecutor());
+ final CompletableFuture<Void>
confirmationFuture = fencingTokenFuture.thenCombineAsync(
+ recoveredJobsFuture,
+
BiFunctionWithException.unchecked((Boolean confirmLeadership,
Collection<JobGraph> recoveredJobs) -> {
+ if (confirmLeadership) {
+
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+ } else {
+ for (JobGraph
recoveredJob : recoveredJobs) {
+
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
+ }
+ }
+ return null;
+ }),
+ getRpcService().getExecutor());
+
+ confirmationFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+
onFatalError(ExceptionUtils.stripCompletionException(throwable));
+ }
+ });
- confirmationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
- if (throwable != null) {
-
onFatalError(ExceptionUtils.stripCompletionException(throwable));
- }
+ recoveryOperation = confirmationFuture;
});
}
@@ -813,7 +850,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
}
}
- private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
+ private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> action) {
final CompletableFuture<Void> jobManagerTerminationFuture =
getJobTerminationFuture(jobId)
.exceptionally((Throwable throwable) -> {
throw new CompletionException(
@@ -822,14 +859,14 @@ public void grantLeadership(final UUID
newLeaderSessionID) {
throwable)); });
return jobManagerTerminationFuture.thenRunAsync(
- () -> {
+ ThrowingRunnable.unchecked(() -> {
jobManagerTerminationFutures.remove(jobId);
action.accept(jobGraph);
- },
+ }),
getMainThreadExecutor());
}
- protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+ CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
if (jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new
DispatcherException(String.format("Job with job id %s is still running.",
jobId)));
} else {
@@ -837,6 +874,11 @@ public void grantLeadership(final UUID newLeaderSessionID)
{
}
}
+ @VisibleForTesting
+ CompletableFuture<Void> getRecoveryOperation() {
+ return recoveryOperation;
+ }
+
private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
// clear the state if we've been the leader before
if (getFencingToken() != null) {
@@ -879,24 +921,63 @@ public void handleError(final Exception exception) {
@Override
public void onAddedJobGraph(final JobID jobId) {
- final CompletableFuture<SubmittedJobGraph> recoveredJob =
getRpcService().execute(
- () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
- final CompletableFuture<Acknowledge> submissionFuture =
recoveredJob.thenComposeAsync(
- (SubmittedJobGraph submittedJobGraph) ->
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
- getMainThreadExecutor());
-
- submissionFuture.whenComplete(
- (Acknowledge acknowledge, Throwable throwable) -> {
- if (throwable != null) {
- onFatalError(
- new DispatcherException(
- String.format("Could
not start the added job %s", jobId),
-
ExceptionUtils.stripCompletionException(throwable)));
+ runAsync(
+ () -> {
+ if (!jobManagerRunners.containsKey(jobId)) {
+ // IMPORTANT: onAddedJobGraph can
generate false positives and, thus, we must expect that
+ // the specified job is already removed
from the SubmittedJobGraphStore. In this case,
+ // SubmittedJobGraphStore.recoverJob
returns null.
+ final
CompletableFuture<Optional<JobGraph>> recoveredJob =
recoveryOperation.thenApplyAsync(
+
FunctionUtils.uncheckedFunction(ignored ->
Optional.ofNullable(recoverJob(jobId))),
+ getRpcService().getExecutor());
+
+ final DispatcherId dispatcherId =
getFencingToken();
+ final CompletableFuture<Void>
submissionFuture = recoveredJob.thenComposeAsync(
+ (Optional<JobGraph>
jobGraphOptional) -> jobGraphOptional.map(
+
FunctionUtils.uncheckedFunction(jobGraph -> tryRunRecoveredJobGraph(jobGraph,
dispatcherId).thenAcceptAsync(
+
FunctionUtils.uncheckedConsumer((Boolean isRecoveredJobRunning) -> {
+
if (!isRecoveredJobRunning) {
+
submittedJobGraphStore.releaseJobGraph(jobId);
+
}
+ }),
+
getRpcService().getExecutor())))
+
.orElse(CompletableFuture.completedFuture(null)),
+
getUnfencedMainThreadExecutor());
+
+ submissionFuture.whenComplete(
+ (Void ignored, Throwable
throwable) -> {
+ if (throwable != null) {
+ onFatalError(
+ new
DispatcherException(
+
String.format("Could not start the added job %s", jobId),
+
ExceptionUtils.stripCompletionException(throwable)));
+ }
+ });
+
+ recoveryOperation = submissionFuture;
}
});
}
+ private CompletableFuture<Boolean> tryRunRecoveredJobGraph(JobGraph
jobGraph, DispatcherId dispatcherId) throws Exception {
+ if (leaderElectionService.hasLeadership(dispatcherId.toUUID()))
{
+ final JobID jobId = jobGraph.getJobID();
+ if (jobManagerRunners.containsKey(jobId)) {
+ // we must not release the job graph lock since
it can only be locked once and
+ // is currently being executed. Once we support
multiple locks, we must release
+ // the JobGraph here
+ log.debug("Ignore added JobGraph because the
job {} is already running.", jobId);
+ return CompletableFuture.completedFuture(true);
+ } else if
(runningJobsRegistry.getJobSchedulingStatus(jobId) !=
RunningJobsRegistry.JobSchedulingStatus.DONE) {
+ return waitForTerminatingJobManager(jobId,
jobGraph, this::runJob).thenApply(ignored -> true);
+ } else {
+ log.debug("Ignore added JobGraph because the
job {} has already been completed.", jobId);
+ }
+ }
+
+ return CompletableFuture.completedFuture(false);
+ }
+
@Override
public void onRemovedJobGraph(final JobID jobId) {
runAsync(() -> {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ddd3751cc2a..0fd4389fc35 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -708,6 +708,12 @@ protected static Configuration
loadConfiguration(EntrypointClusterConfiguration
configuration.setInteger(RestOptions.PORT, restPort);
}
+ final String hostname =
entrypointClusterConfiguration.getHostname();
+
+ if (hostname != null) {
+ configuration.setString(JobManagerOptions.ADDRESS,
hostname);
+ }
+
return configuration;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
index 75cad7aa946..3472f35d6a2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfiguration.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.entrypoint;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Properties;
@@ -27,14 +28,23 @@
*/
public class EntrypointClusterConfiguration extends ClusterConfiguration {
+ @Nullable
+ private final String hostname;
+
private final int restPort;
- public EntrypointClusterConfiguration(@Nonnull String configDir,
@Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort) {
+ public EntrypointClusterConfiguration(@Nonnull String configDir,
@Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String
hostname, int restPort) {
super(configDir, dynamicProperties, args);
+ this.hostname = hostname;
this.restPort = restPort;
}
public int getRestPort() {
return restPort;
}
+
+ @Nullable
+ public String getHostname() {
+ return hostname;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
index 7dfb784a79c..52f59eeef8b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactory.java
@@ -29,6 +29,8 @@
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
+import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
import static
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
/**
@@ -42,6 +44,8 @@ public Options getOptions() {
options.addOption(CONFIG_DIR_OPTION);
options.addOption(REST_PORT_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
+ options.addOption(HOST_OPTION);
+ options.addOption(EXECUTION_MODE_OPTION);
return options;
}
@@ -52,11 +56,13 @@ public EntrypointClusterConfiguration createResult(@Nonnull
CommandLine commandL
final Properties dynamicProperties =
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortStr =
commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortStr);
+ final String hostname =
commandLine.getOptionValue(HOST_OPTION.getOpt());
return new EntrypointClusterConfiguration(
configDir,
dynamicProperties,
commandLine.getArgs(),
+ hostname,
restPort);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
index 23c9da2485f..443014b9903 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/parser/CommandLineOptions.java
@@ -48,5 +48,26 @@
.desc("use value for given property")
.build();
+ public static final Option HOST_OPTION = Option.builder("h")
+ .longOpt("host")
+ .required(false)
+ .hasArg(true)
+ .argName("hostname")
+ .desc("Hostname for the RPC service.")
+ .build();
+
+ /**
+ * @deprecated exists only for compatibility with legacy mode. Remove
once legacy mode
+ * and execution mode option has been removed.
+ */
+ @Deprecated
+ public static final Option EXECUTION_MODE_OPTION = Option.builder("x")
+ .longOpt("executionMode")
+ .required(false)
+ .hasArg(true)
+ .argName("execution mode")
+ .desc("Deprecated option")
+ .build();
+
private CommandLineOptions() {}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 4a66d32a2ac..736984e88e7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1483,7 +1483,7 @@ private void rescaleJobGraph(Collection<JobVertexID>
operators, int newParalleli
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
}
- rescalingBehaviour.acceptWithException(jobVertex,
newParallelism);
+ rescalingBehaviour.accept(jobVertex, newParallelism);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
index 7de956081d8..64e2ffa1124 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -29,7 +29,7 @@
// rescaling is only executed if the operator can be set to the given
parallelism
STRICT {
@Override
- public void acceptWithException(JobVertex jobVertex, Integer
newParallelism) throws FlinkException {
+ public void accept(JobVertex jobVertex, Integer newParallelism)
throws FlinkException {
if (jobVertex.getMaxParallelism() < newParallelism) {
throw new FlinkException("Cannot rescale vertex
" + jobVertex.getName() +
" because its maximum parallelism " +
jobVertex.getMaxParallelism() +
@@ -42,7 +42,7 @@ public void acceptWithException(JobVertex jobVertex, Integer
newParallelism) thr
// the new parallelism will be the minimum of the given parallelism and
the maximum parallelism
RELAXED {
@Override
- public void acceptWithException(JobVertex jobVertex, Integer
newParallelism) {
+ public void accept(JobVertex jobVertex, Integer newParallelism)
{
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(),
newParallelism));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 28af072a10c..38da82cb7fb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -178,6 +178,7 @@ protected void initChannel(SocketChannel ch) {
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
+ log.debug("Binding rest endpoint to {}:{}.",
restBindAddress, restBindPort);
final ChannelFuture channel;
if (restBindAddress == null) {
channel = bootstrap.bind(restBindPort);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 4d783352459..9c8f7bd1428 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -23,6 +23,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -89,6 +90,23 @@ public static ActorGateway retrieveLeaderGateway(
}
}
+ /**
+ * Retrieves the leader akka url and the current leader session ID. The
values are stored in a
+ * {@link LeaderConnectionInfo} instance.
+ *
+ * @param leaderRetrievalService Leader retrieval service to retrieve
the leader connection
+ * information
+ * @param timeout Timeout when to give up looking for the leader
+ * @return LeaderConnectionInfo containing the leader's akka URL and
the current leader session
+ * ID
+ * @throws LeaderRetrievalException
+ */
+ public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+ LeaderRetrievalService leaderRetrievalService,
+ Time timeout) throws LeaderRetrievalException {
+ return retrieveLeaderConnectionInfo(leaderRetrievalService,
FutureUtils.toFiniteDuration(timeout));
+ }
+
/**
* Retrieves the leader akka url and the current leader session ID. The
values are stored in a
* {@link LeaderConnectionInfo} instance.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 8c3d31fc51b..b9cd0c1b720 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -526,22 +526,13 @@ protected String getLockPath(String rootPath) {
client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path));
} catch (KeeperException.NodeExistsException ignored) {
// we have already created the lock
- } catch (KeeperException.NoNodeException e) {
- throw new Exception("Cannot lock the node " +
path + " since it does not exist.", e);
}
}
boolean success = false;
try {
- byte[] data;
-
- try {
- data = client.getData().forPath(path);
- } catch (Exception e) {
- throw new Exception("Failed to retrieve state
handle data under " + path +
- " from ZooKeeper.", e);
- }
+ byte[] data = client.getData().forPath(path);
try {
RetrievableStateHandle<T>
retrievableStateHandle = InstantiationUtil.deserializeObject(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index cb26f4862b1..335199a2807 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -302,9 +302,7 @@ public void removeJobGraph(JobID jobId) throws Exception {
}
@Override
- public void releaseJobGraph(JobID jobId) throws Exception {
- throw new UnsupportedOperationException("Should not be
called.");
- }
+ public void releaseJobGraph(JobID jobId) throws Exception {}
@Override
public Collection<JobID> getJobIds() throws Exception {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d405fcdcf44..1af10b8c598 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -319,13 +319,13 @@ public void testOnAddedJobGraphWithFinishedJob() throws
Throwable {
runningJobsRegistry.setJobFinished(TEST_JOB_ID);
dispatcher.onAddedJobGraph(TEST_JOB_ID);
- final CompletableFuture<Throwable> errorFuture =
fatalErrorHandler.getErrorFuture();
-
- final Throwable throwable = errorFuture.get();
+ // wait until the recovery is over
+ dispatcher.getRecoverOperationFuture(TIMEOUT).get();
- assertThat(throwable, instanceOf(DispatcherException.class));
+ final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
- fatalErrorHandler.clearError();
+ // check that we did not start executing the added JobGraph
+ assertThat(dispatcherGateway.listJobs(TIMEOUT).get(),
is(empty()));
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 5141be039f7..6a623768bb2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.dispatcher;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
@@ -73,16 +72,20 @@
VoidHistoryServerArchivist.INSTANCE);
}
- @VisibleForTesting
void completeJobExecution(ArchivedExecutionGraph
archivedExecutionGraph) {
runAsync(
() ->
jobReachedGloballyTerminalState(archivedExecutionGraph));
}
- @VisibleForTesting
- public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID
jobId, @Nonnull Time timeout) {
+ CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID jobId,
@Nonnull Time timeout) {
return callAsyncWithoutFencing(
() -> getJobTerminationFuture(jobId),
timeout).thenCompose(Function.identity());
}
+
+ CompletableFuture<Void> getRecoverOperationFuture(@Nonnull Time
timeout) {
+ return callAsyncWithoutFencing(
+ this::getRecoveryOperation,
+ timeout).thenCompose(Function.identity());
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
index dd0375886a9..b5662c064e8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -24,17 +24,25 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
@@ -56,8 +64,13 @@
import java.io.IOException;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
/**
@@ -67,8 +80,8 @@
private static final Time TIMEOUT = Time.seconds(10L);
- @ClassRule
- public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new
ZooKeeperResource();
+ @Rule
+ public final ZooKeeperResource zooKeeperResource = new
ZooKeeperResource();
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@@ -87,14 +100,14 @@
@BeforeClass
public static void setupClass() throws IOException {
configuration = new Configuration();
-
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
ZOO_KEEPER_RESOURCE.getConnectString());
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
rpcService = new TestingRpcService();
blobServer = new BlobServer(configuration, new VoidBlobStore());
}
@Before
- public void setup() {
+ public void setup() throws Exception {
+
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
zooKeeperResource.getConnectString());
testingFatalErrorHandler = new TestingFatalErrorHandler();
}
@@ -139,7 +152,9 @@ public void testSubmittedJobGraphRelease() throws Exception
{
final TestingLeaderElectionService
leaderElectionService = new TestingLeaderElectionService();
testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
- final TestingDispatcher dispatcher =
createDispatcher(testingHighAvailabilityServices);
+ final TestingDispatcher dispatcher = createDispatcher(
+ testingHighAvailabilityServices,
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()));
dispatcher.start();
@@ -167,7 +182,7 @@ public void testSubmittedJobGraphRelease() throws Exception
{
// recover the job
final SubmittedJobGraph submittedJobGraph =
otherSubmittedJobGraphStore.recoverJobGraph(jobId);
- assertThat(submittedJobGraph,
Matchers.is(Matchers.notNullValue()));
+ assertThat(submittedJobGraph,
is(notNullValue()));
// check that the other submitted job graph
store can remove the job graph after the original leader
// has lost its leadership
@@ -184,20 +199,145 @@ public void testSubmittedJobGraphRelease() throws
Exception {
}
}
+ /**
+ * Tests that a standby Dispatcher does not interfere with the clean up
of a completed
+ * job.
+ */
+ @Test
+ public void testStandbyDispatcherJobExecution() throws Exception {
+ try (final TestingHighAvailabilityServices haServices1 = new
TestingHighAvailabilityServices();
+ final TestingHighAvailabilityServices haServices2 = new
TestingHighAvailabilityServices();
+ final CuratorFramework curatorFramework =
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ final ZooKeeperSubmittedJobGraphStore
submittedJobGraphStore1 =
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+
haServices1.setSubmittedJobGraphStore(submittedJobGraphStore1);
+ final TestingLeaderElectionService
leaderElectionService1 = new TestingLeaderElectionService();
+
haServices1.setDispatcherLeaderElectionService(leaderElectionService1);
+
+ final ZooKeeperSubmittedJobGraphStore
submittedJobGraphStore2 =
ZooKeeperUtils.createSubmittedJobGraphs(curatorFramework, configuration);
+
haServices2.setSubmittedJobGraphStore(submittedJobGraphStore2);
+ final TestingLeaderElectionService
leaderElectionService2 = new TestingLeaderElectionService();
+
haServices2.setDispatcherLeaderElectionService(leaderElectionService2);
+
+ final CompletableFuture<JobGraph> jobGraphFuture = new
CompletableFuture<>();
+ final CompletableFuture<ArchivedExecutionGraph>
resultFuture = new CompletableFuture<>();
+ final TestingDispatcher dispatcher1 = createDispatcher(
+ haServices1,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture, resultFuture));
+
+ final TestingDispatcher dispatcher2 = createDispatcher(
+ haServices2,
+ new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()));
+
+ try {
+ dispatcher1.start();
+ dispatcher2.start();
+
+
leaderElectionService1.isLeader(UUID.randomUUID()).get();
+ final DispatcherGateway dispatcherGateway1 =
dispatcher1.getSelfGateway(DispatcherGateway.class);
+
+ final JobGraph jobGraph =
DispatcherHATest.createNonEmptyJobGraph();
+
+ dispatcherGateway1.submitJob(jobGraph,
TIMEOUT).get();
+
+ final CompletableFuture<JobResult>
jobResultFuture = dispatcherGateway1.requestJobResult(jobGraph.getJobID(),
TIMEOUT);
+
+ jobGraphFuture.get();
+
+ // complete the job
+ resultFuture.complete(new
ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build());
+
+ final JobResult jobResult =
jobResultFuture.get();
+
+ assertThat(jobResult.isSuccess(), is(true));
+
+ // wait for the completion of the job
+
dispatcher1.getJobTerminationFuture(jobGraph.getJobID(), TIMEOUT).get();
+
+ // change leadership
+ leaderElectionService1.notLeader();
+
leaderElectionService2.isLeader(UUID.randomUUID()).get();
+
+ // Dispatcher 2 should not recover any jobs
+ final DispatcherGateway dispatcherGateway2 =
dispatcher2.getSelfGateway(DispatcherGateway.class);
+
assertThat(dispatcherGateway2.listJobs(TIMEOUT).get(), is(empty()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(dispatcher1,
TIMEOUT);
+ RpcUtils.terminateRpcEndpoint(dispatcher2,
TIMEOUT);
+ }
+ }
+ }
+
+ /**
+ * Tests that a standby {@link Dispatcher} can recover all submitted
jobs.
+ */
+ @Test
+ public void testStandbyDispatcherJobRecovery() throws Exception {
+ try (CuratorFramework curatorFramework =
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+ HighAvailabilityServices haServices = null;
+ Dispatcher dispatcher1 = null;
+ Dispatcher dispatcher2 = null;
+
+ try {
+ haServices = new
ZooKeeperHaServices(curatorFramework, rpcService.getExecutor(), configuration,
new VoidBlobStore());
+
+ final CompletableFuture<JobGraph>
jobGraphFuture1 = new CompletableFuture<>();
+ dispatcher1 = createDispatcher(
+ haServices,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture1, new CompletableFuture<>()));
+ final CompletableFuture<JobGraph>
jobGraphFuture2 = new CompletableFuture<>();
+ dispatcher2 = createDispatcher(
+ haServices,
+ new
TestingJobManagerRunnerFactory(jobGraphFuture2, new CompletableFuture<>()));
+
+ dispatcher1.start();
+ dispatcher2.start();
+
+ final LeaderConnectionInfo leaderConnectionInfo
=
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(haServices.getDispatcherLeaderRetriever(),
TIMEOUT);
+
+ final DispatcherGateway dispatcherGateway =
rpcService.connect(leaderConnectionInfo.getAddress(),
DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionID()),
DispatcherGateway.class).get();
+
+ final JobGraph nonEmptyJobGraph =
DispatcherHATest.createNonEmptyJobGraph();
+ dispatcherGateway.submitJob(nonEmptyJobGraph,
TIMEOUT).get();
+
+ if
(dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
+ dispatcher1.shutDown();
+
assertThat(jobGraphFuture2.get().getJobID(),
is(equalTo(nonEmptyJobGraph.getJobID())));
+ } else {
+ dispatcher2.shutDown();
+
assertThat(jobGraphFuture1.get().getJobID(),
is(equalTo(nonEmptyJobGraph.getJobID())));
+ }
+ } finally {
+ if (dispatcher1 != null) {
+
RpcUtils.terminateRpcEndpoint(dispatcher1, TIMEOUT);
+ }
+
+ if (dispatcher2 != null) {
+
RpcUtils.terminateRpcEndpoint(dispatcher2, TIMEOUT);
+ }
+
+ if (haServices != null) {
+ haServices.close();
+ }
+ }
+ }
+ }
+
@Nonnull
- private TestingDispatcher
createDispatcher(TestingHighAvailabilityServices
testingHighAvailabilityServices) throws Exception {
+ private TestingDispatcher createDispatcher(HighAvailabilityServices
highAvailabilityServices, Dispatcher.JobManagerRunnerFactory
jobManagerRunnerFactory) throws Exception {
return new TestingDispatcher(
rpcService,
- Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+ Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName()
+ UUID.randomUUID(),
configuration,
- testingHighAvailabilityServices,
+ highAvailabilityServices,
new TestingResourceManagerGateway(),
blobServer,
new HeartbeatServices(1000L, 1000L),
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
null,
new MemoryArchivedExecutionGraphStore(),
- new TestingJobManagerRunnerFactory(new
CompletableFuture<>(), new CompletableFuture<>()),
+ jobManagerRunnerFactory,
testingFatalErrorHandler);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
index da63b7fe046..906e9d5a663 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/EntrypointClusterConfigurationParserFactoryTest.java
@@ -46,7 +46,7 @@ public void testEntrypointClusterConfigurationParsing()
throws FlinkParseExcepti
final String value = "value";
final String arg1 = "arg1";
final String arg2 = "arg2";
- final String[] args = {"--configDir", configDir, "-r",
String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
+ final String[] args = {"--configDir", configDir,
"--executionMode", "cluster", "--host", "localhost", "-r",
String.valueOf(restPort), String.format("-D%s=%s", key, value), arg1, arg2};
final EntrypointClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index 3b9c5786ca4..bf8751547ee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -76,7 +76,7 @@ public synchronized SubmittedJobGraph recoverJobGraph(JobID
jobId) throws Except
verifyIsStarted();
if (recoverJobGraphFunction != null) {
- return
recoverJobGraphFunction.applyWithException(jobId, storedJobs);
+ return recoverJobGraphFunction.apply(jobId, storedJobs);
} else {
return requireNonNull(
storedJobs.get(jobId),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Standby Dispatcher locks submitted JobGraphs
> --------------------------------------------
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination
> Affects Versions: 1.5.3, 1.6.0, 1.7.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent
> state.
> The problem is that we recover in the
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called
> if don't have the leadership the newly added {{JobGraph}}. Recovering the
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the
> {{Dispatcher}} is not the leader, then we won't start that job after its
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we
> are the leader before recovering jobs or we say that recovering jobs does not
> lock them. Only if we can submit the recovered job we lock them. The latter
> approach has the advantage that it follows a quite similar code path as an
> initial job submission. Moreover, jobs are currently also recovered at other
> places. In all these places we currently would need to release the
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g.
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then
> we would have to make sure that no concurrent callback from the
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after
> revoking leadership from the {{Dispatcher}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)