zorro786 commented on a change in pull request #8154: [FLINK-12167] Reset 
context classloader in run and getOptimizedPlan methods
URL: https://github.com/apache/flink/pull/8154#discussion_r287540080
 
 

 ##########
 File path: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 ##########
 @@ -247,44 +252,48 @@ public static OptimizedPlan getOptimizedPlan(Optimizer 
compiler, Plan p, int par
         */
        public JobSubmissionResult run(PackagedProgram prog, int parallelism)
                        throws ProgramInvocationException, 
ProgramMissingJobException {
-               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-               if (prog.isUsingProgramEntryPoint()) {
-
-                       final JobWithJars jobWithJars = prog.getPlanWithJars();
-
-                       return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
-               }
-               else if (prog.isUsingInteractiveMode()) {
-                       log.info("Starting program in interactive mode 
(detached: {})", isDetached());
-
-                       final List<URL> libraries = prog.getAllLibraries();
-
-                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
-                                       prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
-                                       prog.getSavepointSettings());
-                       ContextEnvironment.setAsContext(factory);
-
-                       try {
-                               // invoke main method
-                               prog.invokeInteractiveModeForExecution();
-                               if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
-                                       throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
-                               }
-                               if (isDetached()) {
-                                       // in detached mode, we execute the 
whole user code to extract the Flink job, afterwards we run it here
-                                       return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+               final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+               try {
+                       
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+                       if (prog.isUsingProgramEntryPoint()) {
+                               final JobWithJars jobWithJars = 
prog.getPlanWithJars();
+                               return run(jobWithJars, parallelism, 
prog.getSavepointSettings());
+                       }
+                       else if (prog.isUsingInteractiveMode()) {
+                               log.info("Starting program in interactive mode 
(detached: {})", isDetached());
+
+                               final List<URL> libraries = 
prog.getAllLibraries();
+
+                               ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, libraries,
+                               prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
+                               prog.getSavepointSettings());
+                               ContextEnvironment.setAsContext(factory);
+
+                               try {
+                                       // invoke main method
+                                       
prog.invokeInteractiveModeForExecution();
+                                       if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
+                                               throw new 
ProgramMissingJobException("The program didn't contain a Flink job.");
+                                       }
+                                       if (isDetached()) {
+                                               // in detached mode, we execute 
the whole user code to extract the Flink job, afterwards we run it here
+                                               return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+                                       }
+                                       else {
+                                               // in blocking mode, we execute 
all Flink jobs contained in the user code and then return here
+                                               return 
this.lastJobExecutionResult;
+                                       }
                                }
-                               else {
-                                       // in blocking mode, we execute all 
Flink jobs contained in the user code and then return here
-                                       return this.lastJobExecutionResult;
+                               finally {
+                               ContextEnvironment.unsetContext();
 
 Review comment:
   Formatting here. I have a reason to force push now and retrigger tests (:
   Hope fully the script to untar is fixed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to