gianm commented on code in PR #19397: URL: https://github.com/apache/druid/pull/19397#discussion_r3203838552
########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): Review Comment: No reason to include `(e.g. an MSQ frame processor)` here. I would also edit the code sample to be less MSQ specific. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future + * (e.g. by submitting to an executor) and pass it here. + */ + public static AsyncCursorHolder fromFuture(ListenableFuture<CursorHolder> future) + { + return new AsyncCursorHolder(future); + } + + private final ListenableFuture<CursorHolder> future; + private boolean closed = false; Review Comment: use `@GuardedBy` on `closed` and `disposed`, since synchronization is crucial given how they're used. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future Review Comment: The `@link` and `@code` seem weird here. Also the "Implementations doing real I/O…" bit doesn't seem necessary. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future + * (e.g. by submitting to an executor) and pass it here. + */ + public static AsyncCursorHolder fromFuture(ListenableFuture<CursorHolder> future) + { + return new AsyncCursorHolder(future); + } + + private final ListenableFuture<CursorHolder> future; + private boolean closed = false; + private boolean disposed = false; + + private AsyncCursorHolder(ListenableFuture<CursorHolder> future) + { + this.future = future; + // If close() lands before the future completes we still need to dispose the eventually-produced holder. Register + // a callback up-front so the holder is closed whenever it arrives if we've been closed in the meantime. + Futures.addCallback( + future, + new FutureCallback<>() + { + @Override + public void onSuccess(CursorHolder holder) + { + synchronized (AsyncCursorHolder.this) { + if (closed && !disposed) { + disposed = true; + try { + holder.close(); + } + catch (Throwable ignored) { + // Best-effort — we're already in cleanup; nothing to do if close itself fails. + } + } + } + } + + @Override + public void onFailure(Throwable t) + { + // Nothing to dispose: no holder was produced. + } + }, + MoreExecutors.directExecutor() + ); + } + + /** + * Whether {@link #release()} can return without blocking. Once true, stays true. Callers that need to wait for + * readiness without blocking the current thread should register a {@link #addReadyCallback} and yield. + */ + public boolean isReady() + { + return future.isDone(); + } + + /** + * Take ownership of the underlying {@link CursorHolder}. After this returns, {@link #close()} on this + * {@code AsyncCursorHolder} is a no-op; the caller is responsible for closing the returned holder. Useful when + * passing the holder to another component (e.g. a cursor-lifecycle manager) that takes ownership of it. + * <p> + * Throws {@link IllegalStateException} if the holder is not yet ready, has already been released, or this wrapper Review Comment: `DruidException.defensive` seems more standard. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future + * (e.g. by submitting to an executor) and pass it here. + */ + public static AsyncCursorHolder fromFuture(ListenableFuture<CursorHolder> future) + { + return new AsyncCursorHolder(future); + } + + private final ListenableFuture<CursorHolder> future; + private boolean closed = false; + private boolean disposed = false; + + private AsyncCursorHolder(ListenableFuture<CursorHolder> future) + { + this.future = future; + // If close() lands before the future completes we still need to dispose the eventually-produced holder. Register + // a callback up-front so the holder is closed whenever it arrives if we've been closed in the meantime. + Futures.addCallback( + future, + new FutureCallback<>() + { + @Override + public void onSuccess(CursorHolder holder) + { + synchronized (AsyncCursorHolder.this) { + if (closed && !disposed) { + disposed = true; + try { + holder.close(); + } + catch (Throwable ignored) { + // Best-effort — we're already in cleanup; nothing to do if close itself fails. + } + } + } + } + + @Override + public void onFailure(Throwable t) + { + // Nothing to dispose: no holder was produced. + } + }, + MoreExecutors.directExecutor() + ); + } + + /** + * Whether {@link #release()} can return without blocking. Once true, stays true. Callers that need to wait for + * readiness without blocking the current thread should register a {@link #addReadyCallback} and yield. + */ + public boolean isReady() + { + return future.isDone(); + } + + /** + * Take ownership of the underlying {@link CursorHolder}. After this returns, {@link #close()} on this + * {@code AsyncCursorHolder} is a no-op; the caller is responsible for closing the returned holder. Useful when + * passing the holder to another component (e.g. a cursor-lifecycle manager) that takes ownership of it. + * <p> + * Throws {@link IllegalStateException} if the holder is not yet ready, has already been released, or this wrapper + * has been closed. Throws if the underlying load failed. Does not block; callers must check {@link #isReady()} + * first (typically by yielding via a {@link #addReadyCallback}-driven wait pattern). + */ + public synchronized CursorHolder release() + { + if (closed) { + throw new IllegalStateException("AsyncCursorHolder is already closed"); + } + if (disposed) { + throw new IllegalStateException("AsyncCursorHolder has already been released"); + } + if (!future.isDone()) { + throw new IllegalStateException("AsyncCursorHolder is not ready yet"); + } + final CursorHolder holder = FutureUtils.getUncheckedImmediately(future); + disposed = true; + return holder; + } + + /** + * Register a callback to fire when {@link #isReady()} becomes true (whether the load succeeded or failed). If the + * holder is already ready, the callback fires synchronously. Multiple callbacks may be registered; each fires once. + */ + public void addReadyCallback(Runnable callback) + { + future.addListener(callback, MoreExecutors.directExecutor()); + } + + /** + * Close the holder. Safe at any lifecycle point and idempotent: + * <ul> + * <li>Already-loaded: closes the underlying {@link CursorHolder} immediately.</li> + * <li>Loading in progress: marks for disposal; the produced holder is closed when the load completes.</li> + * <li>Load failed: no-op (nothing to dispose).</li> + * <li>Already closed: no-op.</li> + * </ul> + * Note that closing does NOT cancel an in-flight load — cancelling a future-of-Closeable is the exact lifecycle + * hazard this class exists to prevent. The load completes normally and the resulting holder is closed promptly. + */ + @Override + public synchronized void close() + { + if (closed) { + return; + } + closed = true; + if (disposed) { + // Ownership of the holder was already transferred (via release) or already disposed; nothing to do here. + return; + } + if (future.isDone() && !future.isCancelled()) { + try { + final CursorHolder holder = FutureUtils.getUncheckedImmediately(future); + disposed = true; + holder.close(); + } + catch (Throwable t) { + // Future completed with a failure, no holder to close. Review Comment: Comment isn't entirely right. The `holder.close()` might also have failed. I don't think we want to suppress that. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future + * (e.g. by submitting to an executor) and pass it here. + */ + public static AsyncCursorHolder fromFuture(ListenableFuture<CursorHolder> future) + { + return new AsyncCursorHolder(future); + } + + private final ListenableFuture<CursorHolder> future; + private boolean closed = false; + private boolean disposed = false; + + private AsyncCursorHolder(ListenableFuture<CursorHolder> future) + { + this.future = future; + // If close() lands before the future completes we still need to dispose the eventually-produced holder. Register + // a callback up-front so the holder is closed whenever it arrives if we've been closed in the meantime. + Futures.addCallback( + future, + new FutureCallback<>() + { + @Override + public void onSuccess(CursorHolder holder) + { + synchronized (AsyncCursorHolder.this) { Review Comment: We can avoid having two different `holder.close()` places by attaching the callback in `close` itself. Although, if we avoid futures entirely as suggested in another comment, this comment will become irrelevant. ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future Review Comment: canceling (spelling) ########## processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.FutureUtils; + +import java.io.Closeable; + +/** + * Closeable wrapper around an asynchronously-loaded {@link CursorHolder}, returned by + * {@link CursorFactory#makeCursorHolderAsync}. Designed to make lifecycle management safe even when the holder is + * still loading: callers receive a single Closeable handle and can register it once with their cleanup machinery, + * regardless of where the underlying load is in its lifecycle. + * <p> + * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) + * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future + * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the + * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. + * <p> + * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): + * <pre>{@code + * if (asyncHolder == null) { + * asyncHolder = cursorFactory.makeCursorHolderAsync(spec); + * closer.register(asyncHolder); // safe at any lifecycle point — close() handles in-flight loads + * } + * if (!asyncHolder.isReady()) { + * SettableFuture<?> awaitFuture = SettableFuture.create(); + * asyncHolder.addReadyCallback(() -> awaitFuture.set(null)); + * return ReturnOrAwait.awaitAllFutures(List.of(awaitFuture)); + * } + * final CursorHolder holder = asyncHolder.release(); // ownership transfers to the caller + * // ... use holder; close it when done (or hand it to a component that owns its lifecycle) ... + * }</pre> + * The wrapper has no blocking accessor by design: callers must wait for {@link #isReady()} via + * {@link #addReadyCallback} (so the wait can be expressed as a yield) and then call {@link #release()}. + */ +public class AsyncCursorHolder implements Closeable +{ + /** + * Already-ready {@link AsyncCursorHolder} backed by an immediately-available {@link CursorHolder}. The default + * synchronous {@link CursorFactory#makeCursorHolderAsync} implementation uses this. + */ + public static AsyncCursorHolder completed(CursorHolder holder) + { + return new AsyncCursorHolder(Futures.immediateFuture(holder)); + } + + /** + * Wraps a {@link ListenableFuture}{@code <CursorHolder>}. Implementations doing real I/O should produce the future + * (e.g. by submitting to an executor) and pass it here. + */ + public static AsyncCursorHolder fromFuture(ListenableFuture<CursorHolder> future) + { + return new AsyncCursorHolder(future); + } + + private final ListenableFuture<CursorHolder> future; + private boolean closed = false; + private boolean disposed = false; + + private AsyncCursorHolder(ListenableFuture<CursorHolder> future) + { + this.future = future; + // If close() lands before the future completes we still need to dispose the eventually-produced holder. Register + // a callback up-front so the holder is closed whenever it arrives if we've been closed in the meantime. + Futures.addCallback( + future, + new FutureCallback<>() + { + @Override + public void onSuccess(CursorHolder holder) + { + synchronized (AsyncCursorHolder.this) { + if (closed && !disposed) { + disposed = true; + try { + holder.close(); + } + catch (Throwable ignored) { + // Best-effort — we're already in cleanup; nothing to do if close itself fails. + } + } + } + } + + @Override + public void onFailure(Throwable t) + { + // Nothing to dispose: no holder was produced. + } + }, + MoreExecutors.directExecutor() + ); + } + + /** + * Whether {@link #release()} can return without blocking. Once true, stays true. Callers that need to wait for + * readiness without blocking the current thread should register a {@link #addReadyCallback} and yield. + */ + public boolean isReady() + { + return future.isDone(); + } + + /** + * Take ownership of the underlying {@link CursorHolder}. After this returns, {@link #close()} on this + * {@code AsyncCursorHolder} is a no-op; the caller is responsible for closing the returned holder. Useful when + * passing the holder to another component (e.g. a cursor-lifecycle manager) that takes ownership of it. + * <p> + * Throws {@link IllegalStateException} if the holder is not yet ready, has already been released, or this wrapper + * has been closed. Throws if the underlying load failed. Does not block; callers must check {@link #isReady()} + * first (typically by yielding via a {@link #addReadyCallback}-driven wait pattern). + */ + public synchronized CursorHolder release() + { + if (closed) { + throw new IllegalStateException("AsyncCursorHolder is already closed"); + } + if (disposed) { + throw new IllegalStateException("AsyncCursorHolder has already been released"); + } + if (!future.isDone()) { + throw new IllegalStateException("AsyncCursorHolder is not ready yet"); + } + final CursorHolder holder = FutureUtils.getUncheckedImmediately(future); + disposed = true; + return holder; + } + + /** + * Register a callback to fire when {@link #isReady()} becomes true (whether the load succeeded or failed). If the + * holder is already ready, the callback fires synchronously. Multiple callbacks may be registered; each fires once. + */ + public void addReadyCallback(Runnable callback) + { + future.addListener(callback, MoreExecutors.directExecutor()); + } + + /** + * Close the holder. Safe at any lifecycle point and idempotent: + * <ul> + * <li>Already-loaded: closes the underlying {@link CursorHolder} immediately.</li> + * <li>Loading in progress: marks for disposal; the produced holder is closed when the load completes.</li> + * <li>Load failed: no-op (nothing to dispose).</li> + * <li>Already closed: no-op.</li> + * </ul> + * Note that closing does NOT cancel an in-flight load — cancelling a future-of-Closeable is the exact lifecycle + * hazard this class exists to prevent. The load completes normally and the resulting holder is closed promptly. Review Comment: Really we should structure this to be capable of canceling the in-flight load. I think to make it work we'll want to not base this class on futures at all. Something like: - Accept a `Runnable canceler` in the constructor - Replace the `future` field with `Either<Throwable, T>` - Expose `set` and `setException` methods that return `boolean` (true for set accepted, false for set not accepted) - In `close`, call `canceler` if the set methods haven't been called yet - Whatever provides the actual `CursorHolder` should close it if `set` returns false -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
