This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68910fa5381c8804ddbde3087a2481911ebd6d85 Author: Konstantin Knauf <knauf.konstan...@gmail.com> AuthorDate: Wed May 29 11:16:01 2019 +0200 [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random JobID for non-HA setups This closes #8539. --- .../StandaloneJobClusterConfiguration.java | 8 +- ...daloneJobClusterConfigurationParserFactory.java | 6 +- .../entrypoint/StandaloneJobClusterEntryPoint.java | 23 ++++- ...neJobClusterConfigurationParserFactoryTest.java | 6 +- .../StandaloneJobClusterEntryPointTest.java | 97 ++++++++++++++++++++++ 5 files changed, 128 insertions(+), 12 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java index 875a7c5..36cd17e 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -37,7 +37,7 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; - @Nonnull + @Nullable private final JobID jobId; @Nullable @@ -50,11 +50,11 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura @Nullable String hostname, int restPort, @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull JobID jobId, + @Nullable JobID jobId, @Nullable String jobClassName) { super(configDir, dynamicProperties, args, hostname, restPort); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); - this.jobId = requireNonNull(jobId, "jobId"); + this.jobId = jobId; this.jobClassName = jobClassName; } @@ -63,7 +63,7 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura return savepointRestoreSettings; } - @Nonnull + @Nullable JobID getJobId() { return jobId; } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java index 16fa63d..a99e277 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -29,6 +29,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Properties; @@ -43,8 +44,6 @@ import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST */ public class StandaloneJobClusterConfigurationParserFactory implements ParserResultFactory<StandaloneJobClusterConfiguration> { - static final JobID DEFAULT_JOB_ID = new JobID(0, 0); - private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") .longOpt("job-classname") .required(false) @@ -105,10 +104,11 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes } } + @Nullable private static JobID getJobId(CommandLine commandLine) throws FlinkParseException { String jobId = commandLine.getOptionValue(JOB_ID_OPTION.getOpt()); if (jobId == null) { - return DEFAULT_JOB_ID; + return null; } try { return JobID.fromHexString(jobId); diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 651a874..d8fd6e1 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerCo import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory; import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -34,6 +36,8 @@ import org.apache.flink.runtime.util.SignalHandler; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.Optional; + import static java.util.Objects.requireNonNull; /** @@ -42,6 +46,8 @@ import static java.util.Objects.requireNonNull; */ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { + public static final JobID ZERO_JOB_ID = new JobID(0, 0); + @Nonnull private final JobID jobId; @@ -97,11 +103,26 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( configuration, - clusterConfiguration.getJobId(), + resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), configuration), clusterConfiguration.getSavepointRestoreSettings(), clusterConfiguration.getArgs(), clusterConfiguration.getJobClassName()); ClusterEntrypoint.runClusterEntrypoint(entrypoint); } + + @VisibleForTesting + @Nonnull + static JobID resolveJobIdForCluster(Optional<JobID> optionalJobID, Configuration configuration) { + return optionalJobID.orElseGet(() -> createJobIdForCluster(configuration)); + } + + @Nonnull + private static JobID createJobIdForCluster(Configuration globalConfiguration) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) { + return ZERO_JOB_ID; + } else { + return JobID.generate(); + } + } } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java index f74fc83..4a72d3a 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -36,13 +36,11 @@ import java.io.IOException; import java.util.Optional; import java.util.Properties; -import static org.apache.flink.container.entrypoint.StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -91,7 +89,7 @@ public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none()))); - assertThat(clusterConfiguration.getJobId(), is(equalTo(DEFAULT_JOB_ID))); + assertThat(clusterConfiguration.getJobId(), is(nullValue())); } @Test @@ -106,7 +104,7 @@ public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); assertThat(clusterConfiguration.getHostname(), is(nullValue())); assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none()))); - assertThat(clusterConfiguration.getJobId(), is(not(nullValue()))); + assertThat(clusterConfiguration.getJobId(), is(nullValue())); assertThat(clusterConfiguration.getJobClassName(), is(nullValue())); } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java new file mode 100644 index 0000000..140ff28 --- /dev/null +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.container.entrypoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Optional; + +import static org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsNot.not; + +/** + * Tests for the {@link StandaloneJobClusterEntryPoint}. + */ +public class StandaloneJobClusterEntryPointTest extends TestLogger { + + @Test + public void configuredJobIDTakesPrecedenceWithHA() { + Optional<JobID> jobID = Optional.of(JobID.generate()); + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobID.get())); + } + + @Test + public void configuredJobIDTakesPrecedenceWithoutHA() { + Optional<JobID> jobID = Optional.of(JobID.generate()); + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(jobID.get())); + } + + @Test + public void jobIDdefaultsToZeroWithHA() { + Optional<JobID> jobID = Optional.empty(); + + Configuration globalConfiguration = new Configuration(); + enableHighAvailability(globalConfiguration); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(ZERO_JOB_ID)); + } + + @Test + public void jobIDdefaultsToRandomJobIDWithoutHA() { + Optional<JobID> jobID = Optional.empty(); + + Configuration globalConfiguration = new Configuration(); + + JobID jobIdForCluster = StandaloneJobClusterEntryPoint.resolveJobIdForCluster( + jobID, + globalConfiguration); + + assertThat(jobIdForCluster, is(not(ZERO_JOB_ID))); + } + + private static void enableHighAvailability(final Configuration configuration) { + configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + } +}