clintropolis commented on code in PR #18873:
URL: https://github.com/apache/druid/pull/18873#discussion_r2667697963
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentCacheManagerFactory.java:
##########
@@ -52,11 +52,12 @@ public SegmentCacheManagerFactory(
this.jsonMapper = mapper;
}
- public SegmentCacheManager manufacturate(File storageDir)
+ public SegmentCacheManager manufacturate(File storageDir, boolean
virtualStorage)
{
- final SegmentLoaderConfig loaderConfig = new
SegmentLoaderConfig().withLocations(
- Collections.singletonList(new StorageLocationConfig(storageDir, null,
null))
- );
+ final SegmentLoaderConfig loaderConfig =
+ new SegmentLoaderConfig()
+ .setLocations(Collections.singletonList(new
StorageLocationConfig(storageDir, null, null)))
Review Comment:
i know this isn't new, but should this have a size so like... there is a
guard rail to not just wreck things if something tries to load too much? There
is `tmpStorageBytesPerTask` already on the task config, but that is used for
tmp sorter output it looks like so.. maybe those files should be managed with
StorageLocation too. This doesn't need to be done in this PR, just thinking
##########
multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java:
##########
@@ -66,48 +65,63 @@
public class ExternalInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__external";
+ public static final SegmentDescriptor SEGMENT_DESCRIPTOR =
SegmentId.dummy(SEGMENT_ID).toDescriptor();
private final File temporaryDirectory;
public ExternalInputSliceReader(final File temporaryDirectory)
{
this.temporaryDirectory = temporaryDirectory;
}
- @Override
- public int numReadableInputs(InputSlice slice)
+ public static boolean isFileBasedInputSource(final InputSource inputSource)
{
- final ExternalInputSlice externalInputSlice = (ExternalInputSlice) slice;
- return externalInputSlice.getInputSources().size();
+ return !(inputSource instanceof NilInputSource) && !(inputSource
instanceof InlineInputSource);
}
@Override
- public ReadableInputs attach(
+ public PhysicalInputSlice attach(
final int inputNumber,
final InputSlice slice,
final CounterTracker counters,
final Consumer<Throwable> warningPublisher
)
{
final ExternalInputSlice externalInputSlice = (ExternalInputSlice) slice;
+ final ChannelCounters inputCounters =
counters.channel(CounterNames.inputChannel(inputNumber))
+
.setTotalFiles(slice.fileCount());
+ final List<LoadableSegment> loadableSegments = new ArrayList<>();
- return ReadableInputs.segments(
- () -> Iterators.transform(
- inputSourceSegmentIterator(
- externalInputSlice.getInputSources(),
- externalInputSlice.getInputFormat(),
- externalInputSlice.getSignature(),
- new File(temporaryDirectory, String.valueOf(inputNumber)),
-
counters.channel(CounterNames.inputChannel(inputNumber)).setTotalFiles(slice.fileCount()),
- counters.warnings(),
- warningPublisher
- ),
- ReadableInput::segment
- )
- );
+ for (final InputSource inputSource : externalInputSlice.getInputSources())
{
+ // The LoadableSegment generated here does not acquire a real hold, and
ends up loading the external data in a
+ // processing thread (when the cursor is created). Ideally, this would
be better integrated with the virtual
+ // storage system, giving us storage holds and the ability to load data
outside of a processing thread.
Review Comment:
:+1: it feels like this should be possible, `StorageLocation` is pretty
generic, but agree we can worry about this later
##########
processing/src/main/java/org/apache/druid/segment/SegmentReference.java:
##########
@@ -52,23 +57,60 @@ public SegmentReference(
)
{
this.segmentDescriptor = segmentDescriptor;
- closer.register(cleanupHold);
- this.segmentReference = segmentReference.map(closer::register);
+ this.segmentReference = segmentReference;
+ this.cleanupHold = cleanupHold;
}
public SegmentDescriptor getSegmentDescriptor()
{
return segmentDescriptor;
}
+ /**
+ * Returns the actual segment. Do not close the Segment when you are done
with it, only close the SegmentReference.
+ */
public Optional<Segment> getSegmentReference()
{
return segmentReference;
}
+ /**
+ * Maps the wrapped segment and returns a reference to it. Closes the
reference if the {@link SegmentMapFunction}
+ * throws an exception. Regardless of success or failure of this method, the
old reference should be discarded.
+ * Do not call {@link #close()} on the old reference, only call it on the
new one.
+ */
+ public SegmentReference map(final SegmentMapFunction segmentMapFn)
Review Comment:
could you update the javadoc at the top of this class to indicate that
callers can call this if they don't have a mapFn to apply to the segment at the
time of creation of the `SegmentReference`?
##########
multi-stage-query/src/main/java/org/apache/druid/msq/input/PhysicalInputSlice.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.input.stage.ReadablePartitions;
+
+import java.util.List;
+
+/**
+ * An {@link InputSlice} that has been prepared for reading by an {@link
InputSliceReader}. Nothing contained in
+ * this class references open resources, so this class is not closeable and
does not need to be closed.
+ */
+public class PhysicalInputSlice
Review Comment:
what does 'physical' mean here? obligatory 'naming is hard' comment. Is it
basically 'slice that gets processed to kick everything off by a leaf
processor'? If so, should it be `LeafInputSlice` to match other terminology?
though i guess i could see how that could be confusing too
##########
multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ReadableInputQueue.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.exec.std.StandardPartitionReader;
+import org.apache.druid.msq.input.LoadableSegment;
+import org.apache.druid.msq.input.PhysicalInputSlice;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Queue for returning {@link ReadableInput} from a list of {@link
PhysicalInputSlice}.
+ *
+ * When closed, this object cancels all pending segment loads and releases all
segments that have not yet been
+ * acquired by callers through {@link
SegmentReferenceHolder#getSegmentReferenceOnce()}. Callers that have acquired
+ * segment references are responsible for closing those references, they will
not be closed by this class.
+ */
+public class ReadableInputQueue implements Closeable
+{
+ private static final Logger log = new Logger(ReadableInputQueue.class);
+
+ /**
+ * Partitions to be read.
+ */
+ @GuardedBy("this")
+ private final Queue<ReadablePartition> readablePartitions = new
ArrayDeque<>();
+
+ /**
+ * Segments to be loaded. Some may already be cached locally.
+ */
+ @GuardedBy("this")
+ private final Queue<LoadableSegment> loadableSegments = new ArrayDeque<>();
+
+ /**
+ * Realtime servers to be queried.
+ */
+ @GuardedBy("this")
+ private final Queue<DataServerQueryHandler> queryableServers = new
ArrayDeque<>();
+
+ /**
+ * Segments currently being loaded.
+ */
+ @GuardedBy("this")
+ private final Set<AcquireSegmentAction> loadingSegments = new
LinkedHashSet<>();
+
+ /**
+ * Segments that have been loaded.
+ */
+ @GuardedBy("this")
+ private final Set<SegmentReferenceHolder> loadedSegments = new
LinkedHashSet<>();
+
+ /**
+ * Futures that are generated by loadahead, prior to a call to {@link
#nextInput()} being made.
+ */
+ @GuardedBy("this")
+ private final Set<ListenableFuture<ReadableInput>> loadaheadFutures =
Sets.newIdentityHashSet();
+
+ private final String queryId;
+ private final StandardPartitionReader partitionReader;
+ private final int loadahead;
+ private final AtomicBoolean loadaheadStarted = new AtomicBoolean(false);
+
+ public ReadableInputQueue(
+ final String queryId,
+ final StandardPartitionReader partitionReader,
+ final List<PhysicalInputSlice> slices,
+ final int loadahead
+ )
+ {
+ this.queryId = queryId;
+ this.partitionReader = partitionReader;
+ this.loadahead = loadahead;
+
+ for (final PhysicalInputSlice slice : slices) {
+ // First add all locally-cached segments, before any non-locally-cached
segments. This promotes processing
+ // the locally-cached ones first. It also improves efficiency of
loadahead, since once we get to the
+ // set of non-locally-cached segments, the "next" one is always one that
is worth loading ahead.
+ for (final LoadableSegment loadableSegment :
slice.getLoadableSegments()) {
+ if (loadableSegment.isLikelyCached()) {
+ loadableSegments.add(loadableSegment);
+ }
+ }
+
+ for (final LoadableSegment loadableSegment :
slice.getLoadableSegments()) {
+ if (!loadableSegment.isLikelyCached()) {
+ loadableSegments.add(loadableSegment);
+ }
+ }
+
+ queryableServers.addAll(slice.getQueryableServers());
+ slice.getReadablePartitions().forEach(readablePartitions::add);
+ }
+ }
+
+ /**
+ * Starts loading up to {@link #loadahead} segments for future calls to
{@link #nextInput()}, if this method
+ * has not yet been called. If it has previously been called, subsequent
calls do nothing.
+ */
+ public void startLoadaheadIfNeeded()
+ {
+ if (loadaheadStarted.compareAndSet(false, true)) {
+ for (int i = 0; i < loadahead; i++) {
+ if (!addLoadaheadFuture()) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the number of remaining inputs that can be returned by calls to
{@link #nextInput()}.
+ */
+ public int remaining()
+ {
+ synchronized (this) {
+ return readablePartitions.size() + loadableSegments.size() +
queryableServers.size() + loadaheadFutures.size();
+ }
+ }
+
+ /**
+ * Returns the next {@link ReadableInput}. The future resolves when the
input is ready to read.
+ */
+ @Nullable
+ public ListenableFuture<ReadableInput> nextInput()
+ {
+ ListenableFuture<ReadableInput> future;
+
+ future = nextServerInput();
+ if (future != null) {
+ return future;
+ }
+
+ future = nextChannelInput();
+ if (future != null) {
+ return future;
+ }
+
+ future = nextSegmentInput();
+ if (future != null) {
+ return future;
+ }
+
+ return null;
+ }
+
+ /**
+ * Returns the next input from {@link #queryableServers}, if any. Returns
null if none remain.
+ */
+ @Nullable
+ private ListenableFuture<ReadableInput> nextServerInput()
+ {
+ final DataServerQueryHandler handler;
+ synchronized (this) {
+ handler = queryableServers.poll();
+ }
+
+ if (handler == null) {
+ return null;
+ }
+
+ return Futures.immediateFuture(ReadableInput.dataServerQuery(handler));
+ }
+
+ /**
+ * Returns the next input from {@link #readablePartitions}, if any. Returns
null if none remain.
+ */
+ @Nullable
+ private ListenableFuture<ReadableInput> nextChannelInput()
+ {
+ final ReadablePartition readablePartition;
+ synchronized (this) {
+ readablePartition = readablePartitions.poll();
+ }
+
+ if (readablePartition == null) {
+ return null;
+ }
+
+ ReadableFrameChannel channel = null;
+ try {
+ channel = partitionReader.openChannel(readablePartition);
+ return Futures.immediateFuture(
+ ReadableInput.channel(
+ channel,
+ partitionReader.frameReader(readablePartition.getStageNumber()),
+ new StagePartition(
+ new StageId(queryId, readablePartition.getStageNumber()),
+ readablePartition.getPartitionNumber()
+ )
+ )
+ );
+ }
+ catch (IOException e) {
+ throw CloseableUtils.closeAndWrapInCatch(e, channel);
+ }
+ }
+
+ /**
+ * Returns the next input from {@link #loadableSegments}, if any. Returns
null if none remain.
+ */
+ @Nullable
+ private ListenableFuture<ReadableInput> nextSegmentInput()
+ {
+ // Pick a loadahead future, preferring ones that are already loaded.
+ ListenableFuture<ReadableInput> selectedLoadaheadFuture = null;
+ synchronized (this) {
+ for (ListenableFuture<ReadableInput> f : loadaheadFutures) {
+ if (selectedLoadaheadFuture == null || f.isDone()) {
+ selectedLoadaheadFuture = f;
+ if (f.isDone()) {
+ break;
+ }
+ }
+ }
+
+ if (selectedLoadaheadFuture != null) {
+ loadaheadFutures.remove(selectedLoadaheadFuture);
+ addLoadaheadFuture(); // Replace the one we just took out.
+ return selectedLoadaheadFuture;
+ }
+ }
+
+ return loadNextSegment();
+ }
+
+ /**
+ * Load the next segment from {@link #loadableSegments} and return a future
to its reference. Returns null
+ * if {@link #loadableSegments} is empty.
+ */
+ @Nullable
+ private ListenableFuture<ReadableInput> loadNextSegment()
+ {
+ synchronized (this) {
+ final LoadableSegment nextLoadableSegment = loadableSegments.poll();
+ if (nextLoadableSegment == null) {
+ return null;
+ }
+
+ final AcquireSegmentAction acquireSegmentAction =
nextLoadableSegment.acquire();
+ loadingSegments.add(acquireSegmentAction);
+ return FutureUtils.transform(
+ acquireSegmentAction.getSegmentFuture(),
+ segment -> {
+ synchronized (ReadableInputQueue.this) {
+ // Transfer segment from "loadingSegments" to "loadedSegments"
and return a reference to it.
+ if (loadingSegments.remove(acquireSegmentAction)) {
+ try {
+ final SegmentReferenceHolder referenceHolder = new
SegmentReferenceHolder(
+ new SegmentReference(
+ nextLoadableSegment.descriptor(),
+ segment.getReferenceProvider().acquireReference(),
+ acquireSegmentAction // Release the hold when the
SegmentReference is closed.
+ ),
+ nextLoadableSegment.inputCounters(),
+ nextLoadableSegment.description()
+ );
+ loadedSegments.add(referenceHolder);
+ return ReadableInput.segment(referenceHolder);
+ }
+ catch (Throwable e) {
+ // Javadoc for segment.acquireReference() suggests it can
throw exceptions; handle that here
+ // by closing the original AcquireSegmentAction.
+ throw CloseableUtils.closeAndWrapInCatch(e,
acquireSegmentAction);
+ }
+ } else {
+ throw DruidException.defensive(
+ "Segment[%s] removed from loadingSegments before loading
complete. It is possible that close() "
+ + "was called with futures in flight.",
+ nextLoadableSegment.descriptor()
+ );
+ }
+ }
+ }
+ );
+ }
+ }
+
+ /**
+ * Calls {@link #nextSegmentInput()} and adds the future to {@link
#loadaheadFutures}. Returns whether a future
+ * was added.
+ */
+ private boolean addLoadaheadFuture()
+ {
+ final ListenableFuture<ReadableInput> nextFuture = loadNextSegment();
+ if (nextFuture != null) {
+ synchronized (this) {
+ loadaheadFutures.add(nextFuture);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ synchronized (this) {
+ readablePartitions.clear();
+ queryableServers.clear();
+ loadableSegments.clear();
+
+ // Cancel all pending segment loads.
+ for (AcquireSegmentAction acquireSegmentAction : loadingSegments) {
+ CloseableUtils.closeAndSuppressExceptions(
+ acquireSegmentAction,
+ e -> log.warn(e, "Failed to close loadingSegment[%s]",
acquireSegmentAction)
Review Comment:
this looks legit, maybe we should decorate `AcquireSegmentAction` to have a
segmentId or something and implement toString
##########
multi-stage-query/src/main/java/org/apache/druid/msq/input/RegularLoadableSegment.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.loading.AcquireSegmentAction;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.PartitionChunk;
+
+import javax.annotation.Nullable;
+import java.util.Optional;
+
+/**
+ * Implementation of {@link LoadableSegment} for regular Druid segments loaded
via {@link SegmentManager}.
+ * Created by {@link
org.apache.druid.msq.input.table.SegmentsInputSliceReader}.
+ */
+public class RegularLoadableSegment implements LoadableSegment
+{
+ private final SegmentManager segmentManager;
+ private final SegmentId segmentId;
+ private final SegmentDescriptor descriptor;
+ @Nullable
+ private final ChannelCounters inputCounters;
+ @Nullable
+ private final CoordinatorClient coordinatorClient;
+ private final boolean isReindex;
+
+ @GuardedBy("this")
+ private boolean acquired;
+
+ /**
+ * Cached DataSegment from local timeline, if available. Null if not in
local timeline or if isReindex is true.
+ */
+ @Nullable
+ private final DataSegment cachedDataSegment;
+
+ /**
+ * Memoized supplier for the DataSegment future.
+ */
+ private final Supplier<ListenableFuture<DataSegment>>
dataSegmentFutureSupplier;
+
+ /**
+ * Create a new RegularLoadableSegment.
+ *
+ * @param segmentManager segment manager for loading and caching segments
+ * @param segmentId the segment ID to load
+ * @param descriptor segment descriptor for querying
+ * @param inputCounters optional counters for tracking input
+ * @param coordinatorClient optional client for fetching DataSegment from
Coordinator when not available locally
+ * @param isReindex true if this is a DML command writing to the
same table it's reading from
+ */
+ public RegularLoadableSegment(
+ final SegmentManager segmentManager,
+ final SegmentId segmentId,
+ final SegmentDescriptor descriptor,
+ @Nullable final ChannelCounters inputCounters,
+ @Nullable final CoordinatorClient coordinatorClient,
+ final boolean isReindex
+ )
+ {
+ if (isReindex && coordinatorClient == null) {
+ throw DruidException.defensive("Got isReindex[%s], cannot respect this
without a coordinatorClient", isReindex);
+ }
+
+ this.segmentManager = segmentManager;
+ this.segmentId = segmentId;
+ this.descriptor = descriptor;
+ this.inputCounters = inputCounters;
+ this.coordinatorClient = coordinatorClient;
+ this.isReindex = isReindex;
+
+ // Can't rely on local timeline if isReindex; always need to check the
Coordinator to confirm the segment
+ // is still active.
+ this.cachedDataSegment = isReindex ? null :
getDataSegmentFromLocalTimeline();
+ this.dataSegmentFutureSupplier = Suppliers.memoize(this::fetchDataSegment);
+ }
+
+ @Override
+ public ListenableFuture<DataSegment> dataSegmentFuture()
+ {
+ return Futures.nonCancellationPropagating(dataSegmentFutureSupplier.get());
+ }
+
+ @Override
+ public SegmentDescriptor descriptor()
+ {
+ return descriptor;
+ }
+
+ @Override
+ @Nullable
+ public ChannelCounters inputCounters()
+ {
+ return inputCounters;
+ }
+
+ @Override
+ @Nullable
+ public String description()
+ {
+ return segmentId.toString();
+ }
+
+ @Override
+ public synchronized Optional<Segment> acquireIfCached()
+ {
+ if (acquired) {
+ throw DruidException.defensive("Segment with descriptor[%s] is already
acquired", descriptor);
+ }
+
+ if (cachedDataSegment != null) {
+ final Optional<Segment> cachedSegment =
segmentManager.acquireCachedSegment(cachedDataSegment);
+ if (cachedSegment.isPresent()) {
+ acquired = true;
+ }
+ return cachedSegment;
+ }
+
+ return Optional.empty();
+ }
+
+ @Override
+ public synchronized AcquireSegmentAction acquire()
+ {
+ if (acquired) {
+ throw DruidException.defensive("Segment with descriptor[%s] is already
acquired", descriptor);
+ }
+
+ acquired = true;
+
+ if (cachedDataSegment != null) {
+ return segmentManager.acquireSegment(cachedDataSegment);
+ } else {
+ // Create a shim AcquireSegmentAction that doesn't acquire a hold (yet).
We can't make a real
+ // AcquireSegmentAction yet because we don't have the DataSegment
object. It needs to be fetched
+ // from the Coordinator. That call is deferred until we're actually
ready to load the segment, because
+ // we don't make the calls all at once when loading a lot of segments.
+
+ final Closer closer = Closer.create();
+ return new AcquireSegmentAction(
+ Suppliers.memoize(() -> FutureUtils.transformAsync(
+ dataSegmentFutureSupplier.get(),
+ dataSegment ->
closer.register(segmentManager.acquireSegment(dataSegment)).getSegmentFuture()
+ )),
+ closer
+ );
+ }
+ }
+
+ /**
+ * Fetches the {@link DataSegment}, either returning it immediately if
cached or fetching from the Coordinator.
+ */
+ private ListenableFuture<DataSegment> fetchDataSegment()
+ {
+ if (cachedDataSegment != null) {
+ return Futures.immediateFuture(cachedDataSegment);
+ } else if (coordinatorClient != null) {
+ return coordinatorClient.fetchSegment(
Review Comment:
i find myself wondering if maybe we can just do something to make
`SegmentManager` more flexible (like say backed with a coordinator client to
download datasegments on the fly) to make it so we can just do all the stuff
with a segmentManager, but haven't really thought about this and maybe is too
weird, either way, i think this is fine for this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]