This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 804bc28  Using LoadingCache instead of Map to cache BundleProcessor
     new 022a2ab  Merge pull request #13893 from [BEAM-11752] Using 
LoadingCache instead of Map to cache BundleProcessor
804bc28 is described below

commit 804bc28310a6093ae06fa1848aa9d5c317a6fbf8
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed Feb 3 17:19:40 2021 -0800

    Using LoadingCache instead of Map to cache BundleProcessor
---
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  5 +-
 .../fn/harness/control/ProcessBundleHandler.java   | 58 ++++++++++++++++------
 2 files changed, 46 insertions(+), 17 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 7e27b5f..502add9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -872,7 +872,10 @@ public abstract class DoFn<InputT extends @Nullable 
Object, OutputT extends @Nul
    * crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is 
shutting down and the
    * process is about to be killed anyway, so all transient resources will be 
released automatically
    * by the OS). In these cases, the call may not happen. It will also not be 
retried, because in
-   * such situations the DoFn instance no longer exists, so there's no 
instance to retry it on.
+   * such situations the DoFn instance no longer exists, so there's no 
instance to retry it on. In
+   * portable execution(with {@code --experiments=beam_fn_api}), the exception 
thrown calling {@link
+   * Teardown} will not fail the bundle execution. Instead, an error message 
will be shown on sdk
+   * harness log.
    *
    * <p>Thus, all work that depends on input elements, and all externally 
important side effects,
    * must be performed in the {@link ProcessElement} or {@link FinishBundle} 
methods.
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 4ecb5f5..7181420 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -20,6 +20,7 @@ package org.apache.beam.fn.harness.control;
 import com.google.auto.value.AutoValue;
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -74,11 +75,13 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Message;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.TextFormat;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.joda.time.Instant;
@@ -538,7 +541,8 @@ public class ProcessBundleHandler {
   /** A cache for {@link BundleProcessor}s. */
   public static class BundleProcessorCache {
 
-    private final Map<String, ConcurrentLinkedQueue<BundleProcessor>> 
cachedBundleProcessors;
+    private final LoadingCache<String, ConcurrentLinkedQueue<BundleProcessor>>
+        cachedBundleProcessors;
     private final Map<String, BundleProcessor> activeBundleProcessors;
 
     @Override
@@ -547,14 +551,32 @@ public class ProcessBundleHandler {
     }
 
     BundleProcessorCache() {
-      this.cachedBundleProcessors = Maps.newConcurrentMap();
+      this.cachedBundleProcessors =
+          CacheBuilder.newBuilder()
+              .expireAfterAccess(Duration.ofMinutes(1L))
+              .removalListener(
+                  removalNotification -> {
+                    ((ConcurrentLinkedQueue<BundleProcessor>) 
removalNotification.getValue())
+                        .forEach(
+                            bundleProcessor -> {
+                              bundleProcessor.shutdown();
+                            });
+                  })
+              .build(
+                  new CacheLoader<String, 
ConcurrentLinkedQueue<BundleProcessor>>() {
+                    @Override
+                    public ConcurrentLinkedQueue<BundleProcessor> load(String 
s) throws Exception {
+                      return new ConcurrentLinkedQueue<>();
+                    }
+                  });
       // We specifically use a weak hash map so that references will 
automatically go out of scope
       // and not need to be freed explicitly from the cache.
       this.activeBundleProcessors = Collections.synchronizedMap(new 
WeakHashMap<>());
     }
 
+    @VisibleForTesting
     Map<String, ConcurrentLinkedQueue<BundleProcessor>> 
getCachedBundleProcessors() {
-      return cachedBundleProcessors;
+      return ImmutableMap.copyOf(cachedBundleProcessors.asMap());
     }
 
     /**
@@ -570,8 +592,7 @@ public class ProcessBundleHandler {
         String instructionId,
         Supplier<BundleProcessor> bundleProcessorSupplier) {
       ConcurrentLinkedQueue<BundleProcessor> bundleProcessors =
-          cachedBundleProcessors.computeIfAbsent(
-              bundleDescriptorId, descriptorId -> new 
ConcurrentLinkedQueue<>());
+          cachedBundleProcessors.getUnchecked(bundleDescriptorId);
       BundleProcessor bundleProcessor = bundleProcessors.poll();
       if (bundleProcessor == null) {
         bundleProcessor = bundleProcessorSupplier.get();
@@ -609,16 +630,7 @@ public class ProcessBundleHandler {
 
     /** Shutdown all the cached {@link BundleProcessor}s, running the 
tearDown() functions. */
     void shutdown() throws Exception {
-      for (ConcurrentLinkedQueue<BundleProcessor> bundleProcessors :
-          cachedBundleProcessors.values()) {
-        for (BundleProcessor bundleProcessor : bundleProcessors) {
-          for (ThrowingRunnable tearDownFunction : 
bundleProcessor.getTearDownFunctions()) {
-            LOG.debug("Tearing down function {}", tearDownFunction);
-            tearDownFunction.run();
-          }
-        }
-      }
-      cachedBundleProcessors.clear();
+      cachedBundleProcessors.invalidateAll();
     }
   }
 
@@ -705,6 +717,20 @@ public class ProcessBundleHandler {
         resetFunction.run();
       }
     }
+
+    void shutdown() {
+      for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) {
+        LOG.debug("Tearing down function {}", tearDownFunction);
+        try {
+          tearDownFunction.run();
+        } catch (Exception e) {
+          LOG.error(
+              "Exceptions are thrown from DoFn.teardown method. Note that it 
will not fail the"
+                  + " pipeline execution,",
+              e);
+        }
+      }
+    }
   }
 
   /**

Reply via email to