[ 
https://issues.apache.org/jira/browse/BEAM-5332?focusedWorklogId=142159&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142159
 ]

ASF GitHub Bot logged work on BEAM-5332:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Sep/18 12:18
            Start Date: 07/Sep/18 12:18
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6342: [BEAM-5332] Do not 
attempt to evict cache after shutdown
URL: https://github.com/apache/beam/pull/6342
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 565e0b9771b..988a94826fb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -24,7 +24,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -42,7 +41,6 @@
     implements FlinkExecutableStageContext.Factory {
   private static final Logger LOG =
       
LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
-  private static final int TTL_IN_SECONDS = 30;
   private static final int MAX_RETRY = 3;
 
   private final Creator creator;
@@ -106,8 +104,9 @@ private void scheduleRelease(JobInfo jobInfo) {
     WrappedContext wrapper = getCache().get(jobInfo.jobId());
     Preconditions.checkState(
         wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-    // Schedule task to clean the container later.
-    getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, 
TimeUnit.SECONDS);
+    // Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
+    // available anymore after the tasks have been removed from the execution 
engine.
+    release(wrapper);
   }
 
   private ConcurrentHashMap<String, WrappedContext> getCache() {
@@ -148,8 +147,8 @@ void release(FlinkExecutableStageContext context) {
         if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
           try {
             wrapper.closeActual();
-          } catch (Exception e) {
-            LOG.error("Unable to close.", e);
+          } catch (Throwable t) {
+            LOG.error("Unable to close FlinkExecutableStageContext.", t);
           }
         }
       }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
index cf758647307..06a43c81b0c 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -17,10 +17,16 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Charsets;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.junit.Assert;
@@ -77,4 +83,33 @@ public void testCreateReuseReleaseCreate() throws Exception {
     Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
     factory.release(ac4B); // 0 open jobB
   }
+
+  @Test
+  public void testCatchThrowablesAndLogThem() throws Exception {
+    PrintStream oldErr = System.err;
+    oldErr.flush();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream newErr = new PrintStream(baos);
+    try {
+      System.setErr(newErr);
+      Creator creator = mock(Creator.class);
+      FlinkExecutableStageContext c1 = mock(FlinkExecutableStageContext.class);
+      when(creator.apply(any(JobInfo.class))).thenReturn(c1);
+      // throw an Throwable and ensure that it is caught and logged.
+      doThrow(new NoClassDefFoundError()).when(c1).close();
+      ReferenceCountingFlinkExecutableStageContextFactory factory =
+          ReferenceCountingFlinkExecutableStageContextFactory.create(creator);
+      JobInfo jobA = mock(JobInfo.class);
+      when(jobA.jobId()).thenReturn("jobA");
+      FlinkExecutableStageContext ac1A = factory.get(jobA);
+      factory.release(ac1A);
+      newErr.flush();
+      String output = new String(baos.toByteArray(), Charsets.UTF_8);
+      // Ensure that the error is logged
+      assertThat(output.contains("Unable to close 
FlinkExecutableStageContext"), is(true));
+    } finally {
+      newErr.flush();
+      System.setErr(oldErr);
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index fec29231a97..a305a59dfba 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -85,19 +85,6 @@ protected DockerJobBundleFactory(JobInfo jobInfo) throws 
Exception {
         provisioningServer);
   }
 
-  @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    environmentCache.invalidateAll();
-    environmentCache.cleanUp();
-
-    // Tear down common servers.
-    controlServer.close();
-    loggingServer.close();
-    retrievalServer.close();
-    provisioningServer.close();
-  }
-
   @Override
   protected ServerFactory getServerFactory() {
     switch (getPlatform()) {
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
index 3c1bd6e96b1..d1717268615 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
@@ -68,13 +68,13 @@
 public abstract class JobBundleFactoryBase implements JobBundleFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(JobBundleFactoryBase.class);
 
-  final IdGenerator stageIdGenerator;
-  final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  final GrpcFnServer<GrpcLoggingService> loggingServer;
-  final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+  private final IdGenerator stageIdGenerator;
+  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
 
-  final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
+  private final LoadingCache<Environment, WrappedSdkHarnessClient> 
environmentCache;
 
   JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
     ServerFactory serverFactory = getServerFactory();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 142159)
    Time Spent: 20m  (was: 10m)

> SDK harness containers are not eventually shut down after job ends
> ------------------------------------------------------------------
>
>                 Key: BEAM-5332
>                 URL: https://issues.apache.org/jira/browse/BEAM-5332
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.8.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When the job shuts down, the user code classloader is cleared which removes 
> the possibility to load new classes. The {{LoadingCache}} attempts to load 
> the {{RemovalCause}} class after job shutdown to evict the cache.
> We shouldn't attempt to execute code after the job has been removed. This is 
> not safe, at least not with Flink.
> {noformat}
> 2018-09-06 15:37:07,996 ERROR 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory
>   - Unable to close.
> java.lang.NoClassDefFoundError: 
> org/apache/beam/repackaged/beam_runners_java_fn_execution/com/google/common/cache/RemovalCause
>         at 
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3290)
>         at 
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache.clear(LocalCache.java:4322)
>         at 
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4937)
>         at 
> org.apache.beam.runners.fnexecution.control.JobBundleFactoryBase.close(JobBundleFactoryBase.java:186)
>         at 
> org.apache.beam.runners.flink.translation.functions.FlinkBatchExecutableStageContext.close(FlinkBatchExecutableStageContext.java:68)
>         at 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingFlinkExecutableStageContextFactory.java:186)
>         at 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingFlinkExecutableStageContextFactory.java:162)
>         at 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.release(ReferenceCountingFlinkExecutableStageContextFactory.java:150)
>         at 
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.lambda$scheduleRelease$1(ReferenceCountingFlinkExecutableStageContextFactory.java:110)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.RemovalCause
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         ... 16 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to