Repository: flink Updated Branches: refs/heads/master 31c0754ee -> 463b922ab
[FLINK-8905][rest][client] fix RestClusterClient#getMaxSlots() returning 0 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52475b34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52475b34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52475b34 Branch: refs/heads/master Commit: 52475b3478e78f12d5e2a9ecb10e2bf3d5133687 Parents: 31c0754 Author: Nico Kruber <[email protected]> Authored: Fri Mar 9 11:05:51 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Mar 18 19:12:00 2018 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/client/cli/CliFrontend.java | 3 ++- .../java/org/apache/flink/client/program/ClusterClient.java | 7 ++++++- .../org/apache/flink/client/program/MiniClusterClient.java | 2 +- .../apache/flink/client/program/StandaloneClusterClient.java | 2 +- .../apache/flink/client/program/rest/RestClusterClient.java | 2 +- .../main/java/org/apache/flink/yarn/YarnClusterClient.java | 2 +- 6 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 06131dc..d636ef7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -86,6 +86,7 @@ import scala.concurrent.duration.FiniteDuration; import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION; +import static org.apache.flink.client.program.ClusterClient.MAX_SLOTS_UNKNOWN; /** * Implementation of a simple command line frontend for executing programs. @@ -262,7 +263,7 @@ public class CliFrontend { int userParallelism = runOptions.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); - if (client.getMaxSlots() != -1 && userParallelism == -1) { + if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) { logAndSysout("Using the parallelism provided by the remote cluster (" + client.getMaxSlots() + "). " + "To use another parallelism, set it at the ./bin/flink client."); http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 1a783fc..b0c50e5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -133,6 +133,11 @@ public abstract class ClusterClient<T> { /** Switch for blocking/detached job submission of the client. */ private boolean detachedJobSubmission = false; + /** + * Value returned by {@link #getMaxSlots()} if the number of maximum slots is unknown. + */ + public static final int MAX_SLOTS_UNKNOWN = -1; + // ------------------------------------------------------------------------ // Construction // ------------------------------------------------------------------------ @@ -1000,7 +1005,7 @@ public abstract class ClusterClient<T> { /** * The client may define an upper limit on the number of slots to use. - * @return -1 if unknown + * @return <tt>-1</tt> ({@link #MAX_SLOTS_UNKNOWN}) if unknown */ public abstract int getMaxSlots(); http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index f0a7631..86e9279 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -181,7 +181,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust @Override public int getMaxSlots() { - return 0; + return MAX_SLOTS_UNKNOWN; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index 1c9c690..e502add 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -98,7 +98,7 @@ public class StandaloneClusterClient extends ClusterClient<StandaloneClusterId> @Override public int getMaxSlots() { - return -1; + return MAX_SLOTS_UNKNOWN; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 8cf0d2c..5558461 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -567,7 +567,7 @@ public class RestClusterClient<T> extends ClusterClient<T> { @Override public int getMaxSlots() { - return 0; + return MAX_SLOTS_UNKNOWN; } //------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/52475b34/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index e0010c7..29ece26 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -139,7 +139,7 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> { public int getMaxSlots() { // TODO: this should be retrieved from the running Flink cluster int maxSlots = numberTaskManagers * slotsPerTaskManager; - return maxSlots > 0 ? maxSlots : -1; + return maxSlots > 0 ? maxSlots : MAX_SLOTS_UNKNOWN; } @Override
