clintropolis commented on code in PR #19397:
URL: https://github.com/apache/druid/pull/19397#discussion_r3206538286


##########
processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.java.util.common.Either;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, 
returned by
+ * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle 
management safe even when the holder is
+ * still loading: callers receive a single Closeable handle and can register 
it once with their cleanup machinery,
+ * regardless of where the underlying load is in its lifecycle.
+ * <p>
+ * The hazard this exists to avoid: returning a {@code 
ListenableFuture<CursorHolder>} (or similar future-of-Closeable)
+ * makes correct cleanup error-prone, where canceling the future or letting a 
caller fail before consuming the future
+ * can orphan the produced holder, leaking the underlying resources. By 
exposing a Closeable that internally tracks the
+ * load and disposes whatever has materialized, callers don't have to write 
that bookkeeping themselves.
+ * <p>
+ * <h3>Producer protocol</h3>
+ * Producers feed results in via {@link #set(CursorHolder)} or {@link 
#setException(Throwable)}, both of which return
+ * a boolean. If they return {@code false}, this wrapper has already been 
closed and the producer is responsible for
+ * closing whatever holder it just produced.
+ * Producers may pass a {@link Runnable} canceler at construction time which 
runs on {@link #close()} when the wrapper
+ * is closed before the {@link #set} has been called, giving the producer an 
opportunity to abort its work. The canceler
+ * is best-effort: a producer may have already produced the holder by the time 
it observes cancellation, in which case
+ * its {@link #set} call will return false and it must close the holder it 
tried to set.
+ * <p>
+ * <h3>Consumer protocol</h3>
+ * Consumers wait for {@link #isReady()} via {@link #addReadyCallback}, and 
{@link #release()} to transfer ownership of
+ * the {@link CursorHolder} (or throw the producer exception). Calling {@link 
#release()} before {@link #isReady()}
+ * returns {@code true}, multiple times, or after this holder has been closed 
will throw a {@link DruidException}.
+ * <p>
+ * For example (using {@link ReturnOrAwait} to show intended yield-then-resume 
usage pattern):
+ * <pre>{@code
+ * if (asyncHolder == null) {
+ *     asyncHolder = cursorFactory.makeCursorHolderAsync(spec);
+ *     closer.register(asyncHolder);  // safe at any lifecycle point, close() 
handles in-flight loads
+ * }
+ * if (!asyncHolder.isReady()) {
+ *     SettableFuture<?> awaitFuture = SettableFuture.create();
+ *     asyncHolder.addReadyCallback(() -> awaitFuture.set(null));
+ *     return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture));
+ * }
+ * final CursorHolder holder = asyncHolder.release();  // ownership transfers 
to the caller
+ * // ... use holder; close it when done (or hand it to a component that owns 
its lifecycle) ...
+ * }</pre>
+ */
+public class AsyncCursorHolder implements Closeable
+{
+  /**
+   * Completed {@link AsyncCursorHolder} backed by an already available {@link 
CursorHolder}
+   */
+  public static AsyncCursorHolder completed(CursorHolder holder)
+  {
+    final AsyncCursorHolder result = new AsyncCursorHolder(null);
+    result.set(holder);
+    return result;
+  }
+
+  @Nullable
+  private final Runnable canceler;
+
+  @GuardedBy("this")
+  @Nullable
+  private CursorHolder result = null;
+  @GuardedBy("this")
+  @Nullable
+  private Throwable error = null;
+  @GuardedBy("this")
+  private boolean closed = false;
+  @GuardedBy("this")
+  private boolean disposed = false;
+  @GuardedBy("this")
+  private final List<Runnable> readyCallbacks = new ArrayList<>();
+
+  /**
+   * @param canceler optional callback invoked from {@link #close()} when the 
wrapper is closed before the load has
+   *                 completed ({@link #set} or {@link #setException}). 
Producers that support cancellation should
+   *                 provide one; producers that don't can pass {@code null}, 
in which case {@link #close()} just stops
+   *                 observing the result.
+   */
+  public AsyncCursorHolder(@Nullable Runnable canceler)
+  {
+    this.canceler = canceler;
+  }
+
+  /**
+   * Allows producer to mark the load successful with the given holder. 
Returns {@code true} if accepted, {@code false}
+   * if this wrapper has already been closed, in which case the producer is 
responsible for closing {@link CursorHolder}
+   * itself. Throws {@link DruidException} if the load was already completed 
(from prior calls to this method or
+   * {@link #setException}).
+   * <p>
+   * Callbacks registered via {@link #addReadyCallback} fire outside the lock 
to avoid re-entrancy deadlocks.
+   */
+  public boolean set(CursorHolder holder)
+  {
+    return setInternal(Either.value(holder));
+  }
+
+  /**
+   * Allows producer to mark the load as failed. Returns {@code true} if 
accepted, {@code false} if this wrapper has
+   * already been closed (no holder was produced, so there's nothing for the 
producer to clean up). Throws
+   * {@link DruidException} if the load was already completed (from prior 
calls to this method or {@link #set}).
+   * <p>
+   * Callbacks registered via {@link #addReadyCallback} fire outside the lock 
to avoid re-entrancy deadlocks.
+   */
+  public boolean setException(Throwable t)
+  {
+    return setInternal(Either.error(t));
+  }
+
+  private boolean setInternal(Either<Throwable, CursorHolder> value)
+  {
+    final List<Runnable> callbacksToFire;
+    synchronized (this) {
+      if (closed) {
+        return false;
+      }
+      if (result != null || error != null) {
+        throw DruidException.defensive("AsyncCursorHolder is already 
completed");
+      }
+      if (value.isError()) {
+        error = value.error();
+      } else {
+        result = value.valueOrThrow();
+      }
+      callbacksToFire = drainCallbacks();
+    }
+    fireCallbacks(callbacksToFire);
+    return true;
+  }
+
+  /**
+   * Whether the load has completed (successfully or with failure). Once true, 
stays true. Callers that need to wait
+   * for readiness without blocking the current thread should register a 
{@link #addReadyCallback} and yield.
+   */
+  public synchronized boolean isReady()
+  {
+    return result != null || error != null;
+  }
+
+  /**
+   * Take ownership of the underlying {@link CursorHolder}. After this 
returns, {@link #close()} on this
+   * {@code AsyncCursorHolder} is a no-op; the caller is responsible for 
closing the returned holder. Useful when
+   * passing the holder to another component (e.g. a cursor-lifecycle manager) 
that takes ownership of it.
+   * <p>
+   * Throws {@link DruidException} if the holder is not yet ready, has already 
been released, or this wrapper
+   * has been closed. Wraps and rethrows the failure if the underlying load 
failed. Does not block; callers must
+   * check {@link #isReady()} first (typically by yielding via a {@link 
#addReadyCallback}-driven wait pattern).
+   */
+  public synchronized CursorHolder release()
+  {
+    if (closed) {
+      throw DruidException.defensive("AsyncCursorHolder is already closed");
+    }
+    if (disposed) {
+      throw DruidException.defensive("AsyncCursorHolder has already been 
released");
+    }
+    if (error != null) {
+      // pass through as is
+      if (error instanceof RuntimeException runtime) {
+        throw runtime;
+      }
+      throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+                          .ofCategory(DruidException.Category.UNCATEGORIZED)
+                          .build(error, error.getMessage());
+    }
+    if (result == null) {
+      throw DruidException.defensive("AsyncCursorHolder is not ready yet");
+    }
+    disposed = true;
+    return result;
+  }
+
+  /**
+   * Register a callback to fire when {@link #isReady()} becomes true (whether 
the load succeeded or failed). If the
+   * holder is already ready, the callback fires synchronously on the calling 
thread. Otherwise it fires on whatever
+   * thread invokes {@link #set} or {@link #setException}, outside the 
wrapper's lock so the callback may safely
+   * re-enter the wrapper. Multiple callbacks may be registered; each fires 
once.
+   */
+  public void addReadyCallback(Runnable callback)
+  {
+    final boolean fireImmediately;
+    synchronized (this) {
+      if (result != null || error != null) {
+        fireImmediately = true;
+      } else {
+        readyCallbacks.add(callback);
+        fireImmediately = false;
+      }
+    }
+    if (fireImmediately) {
+      callback.run();
+    }
+  }
+
+  /**
+   * Close the wrapper. Safe at any lifecycle point and idempotent:
+   * <ul>
+   *   <li>Already-loaded: closes the underlying {@link CursorHolder} 
immediately.</li>
+   *   <li>Loading in progress: invokes the canceler (if one was supplied at 
construction). The producer may still
+   *       call {@link #set} / {@link #setException} after this; if the 
producer wins the race and calls {@code set}
+   *       with a holder, {@code set} returns false and the producer is 
responsible for closing it.</li>
+   *   <li>Load failed: no-op (no holder was produced).</li>
+   *   <li>Already released or already closed: no-op.</li>
+   * </ul>
+   */
+  @Override
+  public void close()
+  {
+    final CursorHolder holderToClose;
+    final Runnable cancelerToRun;
+    synchronized (this) {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      if (disposed) {
+        // Ownership was already transferred via release(); the caller manages 
the holder lifecycle.
+        return;
+      }
+      if (result != null) {
+        // Result is here and no one has released it; we close it.

Review Comment:
   I added the nulling out of `result` in `release()`, however i still have the 
`disposed` flag because otherwise calling release would allow something to be 
able to call `set`/`setException` again which seems unchill, so I've kept this 
short-circuit on `disposed` here as is and haven't really changed this part.
   
   We do essentially always call close on the result if we get to this part 
where we check not null, stuff maybe looks a bit more funny because we actually 
call close and the canceler outside of the synchronized block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to