This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5142be92511 [Java FnApi] Improve FinalizeCallbackManager to cleanup
timeout queue and use recommended ReentrantLock+Condition (#37304)
5142be92511 is described below
commit 5142be925118b4f93daf8cb2d723f4f342bfca88
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Feb 10 10:53:19 2026 +0100
[Java FnApi] Improve FinalizeCallbackManager to cleanup timeout queue and
use recommended ReentrantLock+Condition (#37304)
---
.../fn/harness/control/FinalizeBundleHandler.java | 182 +++++++++++++--------
.../harness/control/FinalizeBundleHandlerTest.java | 49 ++++++
2 files changed, 159 insertions(+), 72 deletions(-)
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java
index d3e9eddf75a..186b0927f16 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java
@@ -23,16 +23,17 @@ import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.FinalizeBundleResponse;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
-import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -45,14 +46,11 @@ import org.joda.time.Instant;
* <p>See <a href="https://s.apache.org/beam-finalizing-bundles">Apache Beam
Portability API: How to
* Finalize Bundles</a> for further details.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
public class FinalizeBundleHandler {
/** A {@link BundleFinalizer.Callback} and expiry time pair. */
@AutoValue
- abstract static class CallbackRegistration {
+ public abstract static class CallbackRegistration {
public static CallbackRegistration create(
Instant expiryTime, BundleFinalizer.Callback callback) {
return new
AutoValue_FinalizeBundleHandler_CallbackRegistration(expiryTime, callback);
@@ -63,77 +61,100 @@ public class FinalizeBundleHandler {
public abstract BundleFinalizer.Callback getCallback();
}
- private final ConcurrentMap<String, Collection<CallbackRegistration>>
bundleFinalizationCallbacks;
- private final PriorityQueue<TimestampedValue<String>> cleanUpQueue;
+ private static class FinalizationInfo {
+ FinalizationInfo(
+ String id, Instant expiryTimestamp, Collection<CallbackRegistration>
callbacks) {
+ this.id = id;
+ this.expiryTimestamp = expiryTimestamp;
+ this.callbacks = callbacks;
+ }
+
+ final String id;
+ final Instant expiryTimestamp;
+ final Collection<CallbackRegistration> callbacks;
+
+ Instant getExpiryTimestamp() {
+ return expiryTimestamp;
+ }
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition queueMinChanged = lock.newCondition();
- @SuppressWarnings("unused")
- private final Future<Void> cleanUpResult;
+ @GuardedBy("lock")
+ private final HashMap<String, FinalizationInfo> bundleFinalizationCallbacks;
+ @GuardedBy("lock")
+ private final PriorityQueue<FinalizationInfo> cleanUpQueue;
+
+ @SuppressWarnings("methodref.receiver.bound")
public FinalizeBundleHandler(ExecutorService executorService) {
- this.bundleFinalizationCallbacks = new ConcurrentHashMap<>();
+ this.bundleFinalizationCallbacks = new HashMap<>();
this.cleanUpQueue =
- new PriorityQueue<>(11,
Comparator.comparing(TimestampedValue::getTimestamp));
- // Wait until we have at least one element. We are notified on each element
- // being added.
- // Wait until the current time has past the expiry time for the head of the
- // queue.
- // We are notified on each element being added.
- // Wait until we have at least one element. We are notified on each element
- // being added.
- // Wait until the current time has past the expiry time for the head of the
- // queue.
- // We are notified on each element being added.
- cleanUpResult =
- executorService.submit(
- (Callable<Void>)
- () -> {
- while (true) {
- synchronized (cleanUpQueue) {
- TimestampedValue<String> expiryTime =
cleanUpQueue.peek();
-
- // Wait until we have at least one element. We are
notified on each element
- // being added.
- while (expiryTime == null) {
- cleanUpQueue.wait();
- expiryTime = cleanUpQueue.peek();
- }
-
- // Wait until the current time has past the expiry time
for the head of the
- // queue.
- // We are notified on each element being added.
- Instant now = Instant.now();
- while (expiryTime.getTimestamp().isAfter(now)) {
- Duration timeDifference = new Duration(now,
expiryTime.getTimestamp());
- cleanUpQueue.wait(timeDifference.getMillis());
- expiryTime = cleanUpQueue.peek();
- now = Instant.now();
- }
-
-
bundleFinalizationCallbacks.remove(cleanUpQueue.poll().getValue());
- }
- }
- });
+ new PriorityQueue<>(11,
Comparator.comparing(FinalizationInfo::getExpiryTimestamp));
+ executorService.execute(this::cleanupThreadBody);
+ }
+
+ private void cleanupThreadBody() {
+ lock.lock();
+ try {
+ while (true) {
+ final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.expiryTimestamp);
+ if (timeDifference.getMillis() < 0
+ || (queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)
+ && cleanUpQueue.peek() == minValue)) {
+ // The minimum element has an expiry time before now, either because
it had elapsed when
+ // we pulled it or because we awaited it and it is still the minimum.
+ checkState(minValue == cleanUpQueue.poll());
+ checkState(bundleFinalizationCallbacks.remove(minValue.id) ==
minValue);
+ }
+ }
+ } catch (InterruptedException e) {
+ // We're being shutdown.
+ } finally {
+ lock.unlock();
+ }
}
public void registerCallbacks(String bundleId,
Collection<CallbackRegistration> callbacks) {
if (callbacks.isEmpty()) {
return;
}
-
- Collection<CallbackRegistration> priorCallbacks =
- bundleFinalizationCallbacks.putIfAbsent(bundleId, callbacks);
- checkState(
- priorCallbacks == null,
- "Expected to not have any past callbacks for bundle %s but found %s.",
- bundleId,
- priorCallbacks);
- long expiryTimeMillis = Long.MIN_VALUE;
+ Instant maxExpiryTime = Instant.EPOCH;
for (CallbackRegistration callback : callbacks) {
- expiryTimeMillis = Math.max(expiryTimeMillis,
callback.getExpiryTime().getMillis());
+ Instant callbackExpiry = callback.getExpiryTime();
+ if (callbackExpiry.isAfter(maxExpiryTime)) {
+ maxExpiryTime = callbackExpiry;
+ }
}
- synchronized (cleanUpQueue) {
- cleanUpQueue.offer(TimestampedValue.of(bundleId, new
Instant(expiryTimeMillis)));
- cleanUpQueue.notify();
+ final FinalizationInfo info = new FinalizationInfo(bundleId,
maxExpiryTime, callbacks);
+
+ lock.lock();
+ try {
+ FinalizationInfo existingInfo =
bundleFinalizationCallbacks.put(bundleId, info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + bundleId
+ + " but had "
+ + existingInfo.callbacks);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -141,16 +162,24 @@ public class FinalizeBundleHandler {
throws Exception {
String bundleId = request.getFinalizeBundle().getInstructionId();
- Collection<CallbackRegistration> callbacks =
bundleFinalizationCallbacks.remove(bundleId);
-
- if (callbacks == null) {
+ @Nullable FinalizationInfo info;
+ lock.lock();
+ try {
+ info = bundleFinalizationCallbacks.remove(bundleId);
+ if (info != null) {
+ checkState(cleanUpQueue.remove(info));
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (info == null) {
// We have already processed the callbacks on a prior bundle
finalization attempt
return BeamFnApi.InstructionResponse.newBuilder()
.setFinalizeBundle(FinalizeBundleResponse.getDefaultInstance());
}
Collection<Exception> failures = new ArrayList<>();
- for (CallbackRegistration callback : callbacks) {
+ for (CallbackRegistration callback : info.callbacks) {
try {
callback.getCallback().onBundleSuccess();
} catch (Exception e) {
@@ -170,4 +199,13 @@ public class FinalizeBundleHandler {
return BeamFnApi.InstructionResponse.newBuilder()
.setFinalizeBundle(FinalizeBundleResponse.getDefaultInstance());
}
+
+ int cleanupQueueSize() {
+ lock.lock();
+ try {
+ return cleanUpQueue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/FinalizeBundleHandlerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/FinalizeBundleHandlerTest.java
index a760d22b78a..136222a2f01 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/FinalizeBundleHandlerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/FinalizeBundleHandlerTest.java
@@ -22,9 +22,14 @@ import static
org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.beam.fn.harness.control.FinalizeBundleHandler.CallbackRegistration;
@@ -32,6 +37,7 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.FinalizeBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.FinalizeBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -106,6 +112,49 @@ public class FinalizeBundleHandlerTest {
}
}
+ @Test
+ public void testCallbackExpiration() throws Exception {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ FinalizeBundleHandler handler = new FinalizeBundleHandler(executor);
+ BundleFinalizer.Callback callback = mock(BundleFinalizer.Callback.class);
+ handler.registerCallbacks(
+ "test",
+ Collections.singletonList(
+
CallbackRegistration.create(Instant.now().plus(Duration.standardHours(1)),
callback)));
+ assertEquals(1, handler.cleanupQueueSize());
+
+ BundleFinalizer.Callback callback2 = mock(BundleFinalizer.Callback.class);
+ handler.registerCallbacks(
+ "test2",
+ Collections.singletonList(
+
CallbackRegistration.create(Instant.now().plus(Duration.millis(100)),
callback2)));
+ BundleFinalizer.Callback callback3 = mock(BundleFinalizer.Callback.class);
+ handler.registerCallbacks(
+ "test3",
+ Collections.singletonList(
+
CallbackRegistration.create(Instant.now().plus(Duration.millis(1)),
callback3)));
+ while (handler.cleanupQueueSize() > 1) {
+ Thread.sleep(500);
+ }
+ // Just the "test" bundle should remain as "test2" and "test3" should have
timed out.
+ assertEquals(1, handler.cleanupQueueSize());
+ // Completing test2 and test3 should have successful response but not
invoke the callbacks
+ // as they were cleaned up.
+ assertEquals(SUCCESSFUL_RESPONSE,
handler.finalizeBundle(requestFor("test2")).build());
+ verifyNoMoreInteractions(callback2);
+ assertEquals(SUCCESSFUL_RESPONSE,
handler.finalizeBundle(requestFor("test3")).build());
+ verifyNoMoreInteractions(callback3);
+ // Completing "test" bundle should call the callback and remove it from
cleanup queue.
+ assertEquals(1, handler.cleanupQueueSize());
+ assertEquals(SUCCESSFUL_RESPONSE,
handler.finalizeBundle(requestFor("test")).build());
+ verify(callback).onBundleSuccess();
+ assertEquals(0, handler.cleanupQueueSize());
+ // Verify that completing again is a no-op as it was cleaned up.
+ assertEquals(SUCCESSFUL_RESPONSE,
handler.finalizeBundle(requestFor("test")).build());
+ verifyNoMoreInteractions(callback);
+ executor.shutdownNow();
+ }
+
private static InstructionRequest requestFor(String bundleId) {
return InstructionRequest.newBuilder()
.setInstructionId(INSTRUCTION_ID)