dmvk commented on a change in pull request #18043:
URL: https://github.com/apache/flink/pull/18043#discussion_r766638783



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -169,12 +242,42 @@ public static void setAsContext(
                             mergedConfiguration,
                             userCodeClassLoader,
                             enforceSingleJobExecution,
-                            suppressSysout);
+                            suppressSysout,
+                            allowConfigurations,
+                            errors);
                 };
         initializeContextEnvironment(factory);
     }
 
     public static void unsetAsContext() {
         resetContextEnvironment();
     }
+
+    private List<String> collectNotAllowedConfigurations() {

Review comment:
       I'm not sure about the approach with the configuration diffing here 🤔
   
   Would it make sense to simply make the configurations objects exposed to the 
user immutable? (something along the lines of 
`java.util.Collections#unmodifiableCollection`)?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -282,6 +295,9 @@ private void runApplicationEntryPoint(
             } else {
                 jobIdsFuture.complete(applicationJobIds);
             }
+        } catch (FatalProgramInvocationException e) {

Review comment:
       isn't this already covered by the catch clause bellow + 
`ExceptionUtils.findThrowable`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FatalProgramInvocationException.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.ProgramInvocationException;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Exception signalling that an exception occurred during the execution of the 
jar's main method.
+ *
+ * <p>The job will transition to FAILED state, and it will not be recovered.
+ */
+@Internal
+public class FatalProgramInvocationException extends 
ProgramInvocationException {

Review comment:
       Also PTAL at the `UnsuccessfulExecutionException`, that seems to be 
related.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -779,6 +779,19 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, 
long initializationTi
                                 operatorId, serializedRequest, timeout));
     }
 
+    @Override
+    public void submitFailedJob(FatalProgramInvocationException exception) {

Review comment:
       Why do we need this method? We didn't really execute the job, so there 
is nothing to archive.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
##########
@@ -84,4 +84,13 @@
                     .withDescription(
                             "Whether a Flink Application cluster should shut 
down automatically after its application finishes"
                                     + " (either successfully or as result of a 
failure). Has no effect for other deployment modes.");
+
+    public static final ConfigOption<Boolean> ALLOW_CLIENT_JOB_CONFIGURATIONS =
+            ConfigOptions.key("execution.allow-client-job-configurations")

Review comment:
       `execution.immutable-configuration`?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -93,6 +137,23 @@ public JobExecutionResult execute(StreamGraph streamGraph) 
throws Exception {
         }
     }
 
+    private void checkNotAllowedConfigurations(StreamGraph streamGraph)
+            throws FatalProgramInvocationException {
+        errorMessages.addAll(collectNotAllowedConfigurations());
+        if (!errorMessages.isEmpty()) {
+            // HACK: We shortcut the StreamGraph to jobgraph translation 
because we already
+            // know that the job needs to fail and can derive the jobId.

Review comment:
       The job fails before being submitted, there is no need to generate jobId 
here. Also wouldn't this break with multiple job submission (we support that in 
non-ha setups)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FatalProgramInvocationException.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.ProgramInvocationException;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Exception signalling that an exception occurred during the execution of the 
jar's main method.
+ *
+ * <p>The job will transition to FAILED state, and it will not be recovered.
+ */
+@Internal
+public class FatalProgramInvocationException extends 
ProgramInvocationException {

Review comment:
       Do we need a new exception type? As far as I can tell this does the same 
thing as ProgramInvocationException (we don't recover from that one either -> 
so it's also "fatal").

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -789,4 +790,50 @@ public void configure(ReadableConfig configuration) {
                 .getOptional(CheckpointingOptions.CHECKPOINTS_DIRECTORY)
                 .ifPresent(this::setCheckpointStorage);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CheckpointConfig that = (CheckpointConfig) o;
+        return checkpointInterval == that.checkpointInterval

Review comment:
       this seems fragile, when the new field will be added, this could not get 
updated. Could we do the comparison of the serialized form (original vs user) 
instead? I'd expect the serialized form to be stable within the same JVM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to