This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 804bc28 Using LoadingCache instead of Map to cache BundleProcessor
new 022a2ab Merge pull request #13893 from [BEAM-11752] Using
LoadingCache instead of Map to cache BundleProcessor
804bc28 is described below
commit 804bc28310a6093ae06fa1848aa9d5c317a6fbf8
Author: Boyuan Zhang <[email protected]>
AuthorDate: Wed Feb 3 17:19:40 2021 -0800
Using LoadingCache instead of Map to cache BundleProcessor
---
.../java/org/apache/beam/sdk/transforms/DoFn.java | 5 +-
.../fn/harness/control/ProcessBundleHandler.java | 58 ++++++++++++++++------
2 files changed, 46 insertions(+), 17 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 7e27b5f..502add9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -872,7 +872,10 @@ public abstract class DoFn<InputT extends @Nullable
Object, OutputT extends @Nul
* crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is
shutting down and the
* process is about to be killed anyway, so all transient resources will be
released automatically
* by the OS). In these cases, the call may not happen. It will also not be
retried, because in
- * such situations the DoFn instance no longer exists, so there's no
instance to retry it on.
+ * such situations the DoFn instance no longer exists, so there's no
instance to retry it on. In
+ * portable execution(with {@code --experiments=beam_fn_api}), the exception
thrown calling {@link
+ * Teardown} will not fail the bundle execution. Instead, an error message
will be shown on sdk
+ * harness log.
*
* <p>Thus, all work that depends on input elements, and all externally
important side effects,
* must be performed in the {@link ProcessElement} or {@link FinishBundle}
methods.
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 4ecb5f5..7181420 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -20,6 +20,7 @@ package org.apache.beam.fn.harness.control;
import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -74,11 +75,13 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.TextFormat;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
@@ -538,7 +541,8 @@ public class ProcessBundleHandler {
/** A cache for {@link BundleProcessor}s. */
public static class BundleProcessorCache {
- private final Map<String, ConcurrentLinkedQueue<BundleProcessor>>
cachedBundleProcessors;
+ private final LoadingCache<String, ConcurrentLinkedQueue<BundleProcessor>>
+ cachedBundleProcessors;
private final Map<String, BundleProcessor> activeBundleProcessors;
@Override
@@ -547,14 +551,32 @@ public class ProcessBundleHandler {
}
BundleProcessorCache() {
- this.cachedBundleProcessors = Maps.newConcurrentMap();
+ this.cachedBundleProcessors =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(Duration.ofMinutes(1L))
+ .removalListener(
+ removalNotification -> {
+ ((ConcurrentLinkedQueue<BundleProcessor>)
removalNotification.getValue())
+ .forEach(
+ bundleProcessor -> {
+ bundleProcessor.shutdown();
+ });
+ })
+ .build(
+ new CacheLoader<String,
ConcurrentLinkedQueue<BundleProcessor>>() {
+ @Override
+ public ConcurrentLinkedQueue<BundleProcessor> load(String
s) throws Exception {
+ return new ConcurrentLinkedQueue<>();
+ }
+ });
// We specifically use a weak hash map so that references will
automatically go out of scope
// and not need to be freed explicitly from the cache.
this.activeBundleProcessors = Collections.synchronizedMap(new
WeakHashMap<>());
}
+ @VisibleForTesting
Map<String, ConcurrentLinkedQueue<BundleProcessor>>
getCachedBundleProcessors() {
- return cachedBundleProcessors;
+ return ImmutableMap.copyOf(cachedBundleProcessors.asMap());
}
/**
@@ -570,8 +592,7 @@ public class ProcessBundleHandler {
String instructionId,
Supplier<BundleProcessor> bundleProcessorSupplier) {
ConcurrentLinkedQueue<BundleProcessor> bundleProcessors =
- cachedBundleProcessors.computeIfAbsent(
- bundleDescriptorId, descriptorId -> new
ConcurrentLinkedQueue<>());
+ cachedBundleProcessors.getUnchecked(bundleDescriptorId);
BundleProcessor bundleProcessor = bundleProcessors.poll();
if (bundleProcessor == null) {
bundleProcessor = bundleProcessorSupplier.get();
@@ -609,16 +630,7 @@ public class ProcessBundleHandler {
/** Shutdown all the cached {@link BundleProcessor}s, running the
tearDown() functions. */
void shutdown() throws Exception {
- for (ConcurrentLinkedQueue<BundleProcessor> bundleProcessors :
- cachedBundleProcessors.values()) {
- for (BundleProcessor bundleProcessor : bundleProcessors) {
- for (ThrowingRunnable tearDownFunction :
bundleProcessor.getTearDownFunctions()) {
- LOG.debug("Tearing down function {}", tearDownFunction);
- tearDownFunction.run();
- }
- }
- }
- cachedBundleProcessors.clear();
+ cachedBundleProcessors.invalidateAll();
}
}
@@ -705,6 +717,20 @@ public class ProcessBundleHandler {
resetFunction.run();
}
}
+
+ void shutdown() {
+ for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) {
+ LOG.debug("Tearing down function {}", tearDownFunction);
+ try {
+ tearDownFunction.run();
+ } catch (Exception e) {
+ LOG.error(
+ "Exceptions are thrown from DoFn.teardown method. Note that it
will not fail the"
+ + " pipeline execution,",
+ e);
+ }
+ }
+ }
}
/**