This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68910fa5381c8804ddbde3087a2481911ebd6d85
Author: Konstantin Knauf <knauf.konstan...@gmail.com>
AuthorDate: Wed May 29 11:16:01 2019 +0200

    [FLINK-12617] [container] StandaloneJobClusterEntrypoint defaults to random 
JobID for non-HA setups
    
    This closes #8539.
---
 .../StandaloneJobClusterConfiguration.java         |  8 +-
 ...daloneJobClusterConfigurationParserFactory.java |  6 +-
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 23 ++++-
 ...neJobClusterConfigurationParserFactoryTest.java |  6 +-
 .../StandaloneJobClusterEntryPointTest.java        | 97 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 12 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 875a7c5..36cd17e 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -37,7 +37,7 @@ final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfigura
        @Nonnull
        private final SavepointRestoreSettings savepointRestoreSettings;
 
-       @Nonnull
+       @Nullable
        private final JobID jobId;
 
        @Nullable
@@ -50,11 +50,11 @@ final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfigura
                        @Nullable String hostname,
                        int restPort,
                        @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-                       @Nonnull JobID jobId,
+                       @Nullable JobID jobId,
                        @Nullable String jobClassName) {
                super(configDir, dynamicProperties, args, hostname, restPort);
                this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
-               this.jobId = requireNonNull(jobId, "jobId");
+               this.jobId = jobId;
                this.jobClassName = jobClassName;
        }
 
@@ -63,7 +63,7 @@ final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfigura
                return savepointRestoreSettings;
        }
 
-       @Nonnull
+       @Nullable
        JobID getJobId() {
                return jobId;
        }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 16fa63d..a99e277 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -29,6 +29,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -43,8 +44,6 @@ import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST
  */
 public class StandaloneJobClusterConfigurationParserFactory implements 
ParserResultFactory<StandaloneJobClusterConfiguration> {
 
-       static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
-
        private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
                .longOpt("job-classname")
                .required(false)
@@ -105,10 +104,11 @@ public class 
StandaloneJobClusterConfigurationParserFactory implements ParserRes
                }
        }
 
+       @Nullable
        private static JobID getJobId(CommandLine commandLine) throws 
FlinkParseException {
                String jobId = 
commandLine.getOptionValue(JOB_ID_OPTION.getOpt());
                if (jobId == null) {
-                       return DEFAULT_JOB_ID;
+                       return null;
                }
                try {
                        return JobID.fromHexString(jobId);
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 651a874..d8fd6e1 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -26,6 +27,7 @@ import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerCo
 import 
org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -34,6 +36,8 @@ import org.apache.flink.runtime.util.SignalHandler;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Optional;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -42,6 +46,8 @@ import static java.util.Objects.requireNonNull;
  */
 public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint 
{
 
+       public static final JobID ZERO_JOB_ID = new JobID(0, 0);
+
        @Nonnull
        private final JobID jobId;
 
@@ -97,11 +103,26 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
 
                StandaloneJobClusterEntryPoint entrypoint = new 
StandaloneJobClusterEntryPoint(
                        configuration,
-                       clusterConfiguration.getJobId(),
+                       
resolveJobIdForCluster(Optional.ofNullable(clusterConfiguration.getJobId()), 
configuration),
                        clusterConfiguration.getSavepointRestoreSettings(),
                        clusterConfiguration.getArgs(),
                        clusterConfiguration.getJobClassName());
 
                ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
+
+       @VisibleForTesting
+       @Nonnull
+       static JobID resolveJobIdForCluster(Optional<JobID> optionalJobID, 
Configuration configuration) {
+               return optionalJobID.orElseGet(() -> 
createJobIdForCluster(configuration));
+       }
+
+       @Nonnull
+       private static JobID createJobIdForCluster(Configuration 
globalConfiguration) {
+               if 
(HighAvailabilityMode.isHighAvailabilityModeActivated(globalConfiguration)) {
+                       return ZERO_JOB_ID;
+               } else {
+                       return JobID.generate();
+               }
+       }
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
index f74fc83..4a72d3a 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -36,13 +36,11 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.Properties;
 
-import static 
org.apache.flink.container.entrypoint.StandaloneJobClusterConfigurationParserFactory.DEFAULT_JOB_ID;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -91,7 +89,7 @@ public class 
StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg
 
                assertThat(clusterConfiguration.getSavepointRestoreSettings(), 
is(equalTo(SavepointRestoreSettings.none())));
 
-               assertThat(clusterConfiguration.getJobId(), 
is(equalTo(DEFAULT_JOB_ID)));
+               assertThat(clusterConfiguration.getJobId(), is(nullValue()));
        }
 
        @Test
@@ -106,7 +104,7 @@ public class 
StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg
                assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
                assertThat(clusterConfiguration.getHostname(), is(nullValue()));
                assertThat(clusterConfiguration.getSavepointRestoreSettings(), 
is(equalTo(SavepointRestoreSettings.none())));
-               assertThat(clusterConfiguration.getJobId(), 
is(not(nullValue())));
+               assertThat(clusterConfiguration.getJobId(), is(nullValue()));
                assertThat(clusterConfiguration.getJobClassName(),  
is(nullValue()));
        }
 
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
new file mode 100644
index 0000000..140ff28
--- /dev/null
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.ZERO_JOB_ID;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNot.not;
+
+/**
+ * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ */
+public class StandaloneJobClusterEntryPointTest extends TestLogger {
+
+       @Test
+       public void configuredJobIDTakesPrecedenceWithHA() {
+               Optional<JobID> jobID = Optional.of(JobID.generate());
+
+               Configuration globalConfiguration = new Configuration();
+               enableHighAvailability(globalConfiguration);
+
+               JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+                       jobID,
+                       globalConfiguration);
+
+               assertThat(jobIdForCluster, is(jobID.get()));
+       }
+
+       @Test
+       public void configuredJobIDTakesPrecedenceWithoutHA() {
+               Optional<JobID> jobID = Optional.of(JobID.generate());
+
+               Configuration globalConfiguration = new Configuration();
+
+               JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+                       jobID,
+                       globalConfiguration);
+
+               assertThat(jobIdForCluster, is(jobID.get()));
+       }
+
+       @Test
+       public void jobIDdefaultsToZeroWithHA() {
+               Optional<JobID> jobID = Optional.empty();
+
+               Configuration globalConfiguration = new Configuration();
+               enableHighAvailability(globalConfiguration);
+
+               JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+                       jobID,
+                       globalConfiguration);
+
+               assertThat(jobIdForCluster, is(ZERO_JOB_ID));
+       }
+
+       @Test
+       public void jobIDdefaultsToRandomJobIDWithoutHA() {
+               Optional<JobID> jobID = Optional.empty();
+
+               Configuration globalConfiguration = new Configuration();
+
+               JobID jobIdForCluster = 
StandaloneJobClusterEntryPoint.resolveJobIdForCluster(
+                       jobID,
+                       globalConfiguration);
+
+               assertThat(jobIdForCluster, is(not(ZERO_JOB_ID)));
+       }
+
+       private static void enableHighAvailability(final Configuration 
configuration) {
+               configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+       }
+}

Reply via email to