This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 26.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/26.0.0 by this push:
new 5b9574dd3d Fix two concurrency issues with segment fetching. (#14042)
(#14256)
5b9574dd3d is described below
commit 5b9574dd3dbc1d60d437fb8572d9acb5f0c2d405
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 11 05:28:54 2023 -0700
Fix two concurrency issues with segment fetching. (#14042) (#14256)
* Fix two concurrency issues with segment fetching. (#14042)
* Fix two concurrency issues with segment fetching.
1) SegmentLocalCacheManager: Fix a concurrency issue where certain directory
cleanup happened outside of directoryWriteRemoveLock. This created the
possibility that segments would be deleted by one thread, while being
actively downloaded by another thread.
2) TaskDataSegmentProcessor (MSQ): Fix a concurrency issue when two stages
in the same process both use the same segment. For example: a self-join
using distributed sort-merge. Prior to this change, the two stages could
delete each others' segments.
3) ReferenceCountingResourceHolder: increment() returns a new
ResourceHolder,
rather than a Releaser. This allows it to be passed to callers without
them
having to hold on to both the original ResourceHolder *and* a Releaser.
4) Simplify various interfaces and implementations by using ResourceHolder
instead of Pair and instead of split-up fields.
* Add test.
* Fix style.
* Remove Releaser.
* Updates from master.
* Add some GuardedBys.
* Use the correct GuardedBy.
* Adjustments.
* Remove unnecessary classes.
---
.../druid/msq/exec/TaskDataSegmentProvider.java | 192 +++++++++---
.../input/external/ExternalInputSliceReader.java | 5 +-
.../msq/input/table/SegmentWithDescriptor.java | 42 +--
.../msq/input/table/SegmentsInputSliceReader.java | 8 +-
.../druid/msq/querykit/BaseLeafFrameProcessor.java | 12 +-
.../querykit/BaseLeafFrameProcessorFactory.java | 37 ++-
.../druid/msq/querykit/DataSegmentProvider.java | 11 +-
.../druid/msq/querykit/LazyResourceHolder.java | 26 +-
.../groupby/GroupByPreShuffleFrameProcessor.java | 9 +-
.../GroupByPreShuffleFrameProcessorFactory.java | 4 +-
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 9 +-
.../scan/ScanQueryFrameProcessorFactory.java | 4 +-
.../msq/exec/TaskDataSegmentProviderTest.java | 342 +++++++++++++++++++++
.../querykit/scan/ScanQueryFrameProcessorTest.java | 5 +-
.../druid/msq/test/CalciteMSQTestsHelper.java | 19 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 12 +-
...rceHolder.java => CloseableResourceHolder.java} | 35 ++-
.../ReferenceCountingResourceHolder.java | 21 +-
.../apache/druid/collections/ResourceHolder.java | 6 +
.../epinephelinae/GroupByMergingQueryRunnerV2.java | 6 +-
.../groupby/epinephelinae/ParallelCombiner.java | 5 +-
.../collections/CloseableResourceHolderTest.java} | 28 +-
.../ReferenceCountingResourceHolderTest.java | 2 +-
.../segment/loading/SegmentLocalCacheManager.java | 12 +-
24 files changed, 674 insertions(+), 178 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
index 7c4dc2bb09..18d60f6e4d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java
@@ -19,14 +19,15 @@
package org.apache.druid.msq.exec;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.java.util.common.FileUtils;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.rpc.CoordinatorServiceClient;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
@@ -38,9 +39,13 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
+import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
/**
* Production implementation of {@link DataSegmentProvider} using Coordinator
APIs.
@@ -50,6 +55,7 @@ public class TaskDataSegmentProvider implements
DataSegmentProvider
private final CoordinatorServiceClient coordinatorClient;
private final SegmentCacheManager segmentCacheManager;
private final IndexIO indexIO;
+ private final ConcurrentHashMap<SegmentId, SegmentHolder> holders;
public TaskDataSegmentProvider(
CoordinatorServiceClient coordinatorClient,
@@ -60,56 +66,162 @@ public class TaskDataSegmentProvider implements
DataSegmentProvider
this.coordinatorClient = coordinatorClient;
this.segmentCacheManager = segmentCacheManager;
this.indexIO = indexIO;
+ this.holders = new ConcurrentHashMap<>();
}
@Override
- public LazyResourceHolder<Segment> fetchSegment(
+ public Supplier<ResourceHolder<Segment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters
)
{
+ // Returns Supplier<ResourceHolder> instead of ResourceHolder, so the
Coordinator calls and segment downloads happen
+ // in processing threads, rather than the main thread. (They happen when
fetchSegmentInternal is called.)
+ return () -> {
+ ResourceHolder<Segment> holder = null;
+
+ while (holder == null) {
+ holder = holders.computeIfAbsent(
+ segmentId,
+ k -> new SegmentHolder(
+ () -> fetchSegmentInternal(segmentId, channelCounters),
+ () -> holders.remove(segmentId)
+ )
+ ).get();
+ }
+
+ return holder;
+ };
+ }
+
+ /**
+ * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters)}. Does
the actual fetching of a segment, once it
+ * is determined that we definitely need to go out and get one.
+ */
+ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
+ final SegmentId segmentId,
+ final ChannelCounters channelCounters
+ )
+ {
+ final DataSegment dataSegment;
try {
- // Use LazyResourceHolder so Coordinator call and segment downloads
happen in processing threads,
- // rather than the main thread.
- return new LazyResourceHolder<>(
- () -> {
- final DataSegment dataSegment;
- try {
- dataSegment = FutureUtils.get(
- coordinatorClient.fetchUsedSegment(
- segmentId.getDataSource(),
- segmentId.toString()
- ),
- true
- );
- }
- catch (InterruptedException | ExecutionException e) {
- throw new RE(e, "Failed to fetch segment details from
Coordinator for [%s]", segmentId);
- }
+ dataSegment = FutureUtils.get(
+ coordinatorClient.fetchUsedSegment(
+ segmentId.getDataSource(),
+ segmentId.toString()
+ ),
+ true
+ );
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new RE(e, "Failed to fetch segment details from Coordinator for
[%s]", segmentId);
+ }
+
+ final Closer closer = Closer.create();
+ try {
+ if (!segmentCacheManager.reserve(dataSegment)) {
+ throw new ISE("Could not reserve location for segment [%s]",
segmentId);
+ }
+ closer.register(() -> segmentCacheManager.cleanup(dataSegment));
+ final File segmentDir = segmentCacheManager.getSegmentFiles(dataSegment);
- final Closer closer = Closer.create();
+ final QueryableIndex index =
closer.register(indexIO.loadIndex(segmentDir));
+ final QueryableIndexSegment segment = new QueryableIndexSegment(index,
dataSegment.getId());
+ final int numRows = index.getNumRows();
+ final long size = dataSegment.getSize();
+ closer.register(() -> channelCounters.addFile(numRows, size));
+ return new ReferenceCountingResourceHolder<>(segment, closer);
+ }
+ catch (IOException | SegmentLoadingException e) {
+ throw CloseableUtils.closeInCatch(
+ new RE(e, "Failed to download segment [%s]", segmentId),
+ closer
+ );
+ }
+ }
+
+ private static class SegmentHolder implements
Supplier<ResourceHolder<Segment>>
+ {
+ private final Supplier<ResourceHolder<Segment>> holderSupplier;
+ private final Closeable cleanupFn;
+
+ @GuardedBy("this")
+ private ReferenceCountingResourceHolder<Segment> holder;
+
+ @GuardedBy("this")
+ private boolean closing;
+
+ @GuardedBy("this")
+ private boolean closed;
+
+ public SegmentHolder(Supplier<ResourceHolder<Segment>> holderSupplier,
Closeable cleanupFn)
+ {
+ this.holderSupplier = holderSupplier;
+ this.cleanupFn = cleanupFn;
+ }
+
+ @Override
+ @Nullable
+ public ResourceHolder<Segment> get()
+ {
+ synchronized (this) {
+ if (closing) {
+ // Wait until the holder is closed.
+ while (!closed) {
try {
- final File segmentDir =
segmentCacheManager.getSegmentFiles(dataSegment);
- closer.register(() -> FileUtils.deleteDirectory(segmentDir));
-
- final QueryableIndex index = indexIO.loadIndex(segmentDir);
- final int numRows = index.getNumRows();
- final long size = dataSegment.getSize();
- closer.register(() -> channelCounters.addFile(numRows, size));
- closer.register(index);
- return Pair.of(new QueryableIndexSegment(index,
dataSegment.getId()), closer);
+ wait();
}
- catch (IOException | SegmentLoadingException e) {
- throw CloseableUtils.closeInCatch(
- new RE(e, "Failed to download segment [%s]", segmentId),
- closer
- );
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
}
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
+
+ // Then, return null so "fetchSegment" will try again.
+ return null;
+ } else if (holder == null) {
+ final ResourceHolder<Segment> segmentHolder = holderSupplier.get();
+ holder = new ReferenceCountingResourceHolder<>(
+ segmentHolder.get(),
+ () -> {
+ synchronized (this) {
+ CloseableUtils.closeAll(
+ () -> {
+ // synchronized block not strictly needed here, but
errorprone needs it since it doesn't
+ // understand the lambda is immediately called. See
https://errorprone.info/bugpattern/GuardedBy
+ synchronized (this) {
+ closing = true;
+ }
+ },
+ segmentHolder,
+ cleanupFn, // removes this holder from the "holders" map
+ () -> {
+ // synchronized block not strictly needed here, but
errorprone needs it since it doesn't
+ // understand the lambda is immediately called. See
https://errorprone.info/bugpattern/GuardedBy
+ synchronized (this) {
+ closed = true;
+ SegmentHolder.this.notifyAll();
+ }
+ }
+ );
+ }
+ }
+ );
+ final ResourceHolder<Segment> retVal = holder.increment();
+ // Store already-closed holder, so it disappears when the last
reference is closed.
+ holder.close();
+ return retVal;
+ } else {
+ try {
+ return holder.increment();
+ }
+ catch (IllegalStateException e) {
+ // Possible race: holder is in the process of closing. (This is
the only reason "increment" can throw ISE.)
+ // Return null so "fetchSegment" will try again.
+ return null;
+ }
+ }
+ }
}
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
index c609f8b4c1..8e70552847 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.input.external;
import com.google.common.collect.Iterators;
+import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
@@ -32,7 +33,6 @@ import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
@@ -48,7 +48,6 @@ import org.apache.druid.msq.input.NilInputSource;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
@@ -234,7 +233,7 @@ public class ExternalInputSliceReader implements
InputSliceReader
);
return new SegmentWithDescriptor(
- new LazyResourceHolder<>(() -> Pair.of(segment, () -> {})),
+ () -> ResourceHolder.fromCloseable(segment),
segmentId.toDescriptor()
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
index 94109bc4a7..020b9f2a5b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java
@@ -24,36 +24,45 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.Segment;
-import java.io.Closeable;
import java.util.Objects;
+import java.util.function.Supplier;
/**
- * A holder for a physical segment.
+ * A holder for a supplier of a physical segment.
*/
-public class SegmentWithDescriptor implements Closeable
+public class SegmentWithDescriptor
{
- private final ResourceHolder<? extends Segment> segmentHolder;
+ private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
private final SegmentDescriptor descriptor;
+ /**
+ * Create a new instance.
+ *
+ * @param segmentSupplier supplier of a {@link ResourceHolder} of segment.
The {@link ResourceHolder#close()} logic
+ * must include a delegated call to {@link
Segment#close()}.
+ * @param descriptor segment descriptor
+ */
public SegmentWithDescriptor(
- final ResourceHolder<? extends Segment> segmentHolder,
+ final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
final SegmentDescriptor descriptor
)
{
- this.segmentHolder = Preconditions.checkNotNull(segmentHolder, "segment");
+ this.segmentSupplier = Preconditions.checkNotNull(segmentSupplier,
"segment");
this.descriptor = Preconditions.checkNotNull(descriptor, "descriptor");
}
/**
* The physical segment.
*
- * Named "getOrLoad" because the segment may be held by an eager or lazy
resource holder (i.e.
- * {@link org.apache.druid.msq.querykit.LazyResourceHolder}). If the
resource holder is lazy, the segment is acquired
+ * Named "getOrLoad" because the segment may be generated by a lazy
supplier. In this case, the segment is acquired
* as part of the call to this method.
+ *
+ * It is not necessary to call {@link Segment#close()} on the returned
segment. Calling {@link ResourceHolder#close()}
+ * is enough.
*/
- public Segment getOrLoadSegment()
+ public ResourceHolder<Segment> getOrLoad()
{
- return segmentHolder.get();
+ return segmentSupplier.get();
}
/**
@@ -64,15 +73,6 @@ public class SegmentWithDescriptor implements Closeable
return descriptor;
}
- /**
- * Release resources used by the physical segment.
- */
- @Override
- public void close()
- {
- segmentHolder.close();
- }
-
@Override
public boolean equals(Object o)
{
@@ -83,12 +83,12 @@ public class SegmentWithDescriptor implements Closeable
return false;
}
SegmentWithDescriptor that = (SegmentWithDescriptor) o;
- return Objects.equals(segmentHolder, that.segmentHolder) &&
Objects.equals(descriptor, that.descriptor);
+ return Objects.equals(segmentSupplier, that.segmentSupplier) &&
Objects.equals(descriptor, that.descriptor);
}
@Override
public int hashCode()
{
- return Objects.hash(segmentHolder, descriptor);
+ return Objects.hash(segmentSupplier, descriptor);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
index 29236d7b49..f7efd287ae 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
@@ -20,7 +20,6 @@
package org.apache.druid.msq.input.table;
import com.google.common.collect.Iterators;
-import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
@@ -29,7 +28,6 @@ import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.ReadableInputs;
import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
import java.util.Iterator;
@@ -92,8 +90,10 @@ public class SegmentsInputSliceReader implements
InputSliceReader
descriptor.getPartitionNumber()
);
- final ResourceHolder<Segment> segmentHolder =
dataSegmentProvider.fetchSegment(segmentId, channelCounters);
- return new SegmentWithDescriptor(segmentHolder, descriptor);
+ return new SegmentWithDescriptor(
+ dataSegmentProvider.fetchSegment(segmentId, channelCounters),
+ descriptor
+ );
}
).iterator();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index e8b761c800..ece5a6e145 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -55,7 +55,7 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
private final Query<?> query;
private final ReadableInput baseInput;
private final List<ReadableFrameChannel> inputChannels;
- private final ResourceHolder<WritableFrameChannel> outputChannel;
+ private final ResourceHolder<WritableFrameChannel> outputChannelHolder;
private final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder;
private final BroadcastJoinHelper broadcastJoinHelper;
@@ -66,14 +66,14 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
- final ResourceHolder<WritableFrameChannel> outputChannel,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
{
this.query = query;
this.baseInput = baseInput;
- this.outputChannel = outputChannel;
+ this.outputChannelHolder = outputChannelHolder;
this.frameWriterFactoryHolder = frameWriterFactoryHolder;
final Pair<List<ReadableFrameChannel>, BroadcastJoinHelper>
inputChannelsAndBroadcastJoinHelper =
@@ -150,7 +150,7 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
@Override
public List<WritableFrameChannel> outputChannels()
{
- return Collections.singletonList(outputChannel.get());
+ return Collections.singletonList(outputChannelHolder.get());
}
@Override
@@ -171,9 +171,7 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
@Override
public void cleanup() throws IOException
{
- // Don't close the output channel, because multiple workers write to the
same channel.
- // The channel should be closed by the caller.
- FrameProcessors.closeAll(inputChannels(), Collections.emptyList(),
outputChannel, frameWriterFactoryHolder);
+ FrameProcessors.closeAll(inputChannels(), Collections.emptyList(),
outputChannelHolder, frameWriterFactoryHolder);
}
protected FrameWriterFactory getFrameWriterFactory()
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index 9f5a7462b4..5305afa2ce 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -32,7 +32,6 @@ import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
@@ -254,7 +253,7 @@ public abstract class BaseLeafFrameProcessorFactory extends
BaseFrameProcessorFa
protected abstract FrameProcessor<Long> makeProcessor(
ReadableInput baseInput,
Int2ObjectMap<ReadableInput> sideChannels,
- ResourceHolder<WritableFrameChannel> outputChannelSupplier,
+ ResourceHolder<WritableFrameChannel> outputChannel,
ResourceHolder<FrameWriterFactory> frameWriterFactory,
FrameContext providerThingy
);
@@ -272,21 +271,29 @@ public abstract class BaseLeafFrameProcessorFactory
extends BaseFrameProcessorFa
resource = queueRef.get().poll();
}
- return Pair.of(
- resource,
- () -> {
- synchronized (queueRef) {
- final Queue<T> queue = queueRef.get();
- if (queue != null) {
- queue.add(resource);
- return;
- }
- }
+ return new ResourceHolder<T>()
+ {
+ @Override
+ public T get()
+ {
+ return resource;
+ }
- // Queue was null
- backupCloser.accept(resource);
+ @Override
+ public void close()
+ {
+ synchronized (queueRef) {
+ final Queue<T> queue = queueRef.get();
+ if (queue != null) {
+ queue.add(resource);
+ return;
+ }
}
- );
+
+ // Queue was null
+ backupCloser.accept(resource);
+ }
+ };
}
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
index b5d95a4401..5f5f6099e5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java
@@ -24,13 +24,18 @@ import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;
+import java.util.function.Supplier;
+
public interface DataSegmentProvider
{
/**
- * Fetches the segment corresponding to the provided segmentId from deep
storage,
- * segment fetched.
+ * Returns a supplier that fetches the segment corresponding to the provided
segmentId from deep storage. The segment
+ * is not actually fetched until you call {@link Supplier#get()}. Once you
call this, make sure to also call
+ * {@link ResourceHolder#close()}.
+ *
+ * It is not necessary to call {@link ResourceHolder#close()} if you never
call {@link Supplier#get()}.
*/
- ResourceHolder<Segment> fetchSegment(
+ Supplier<ResourceHolder<Segment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
index 4ae03c5dcf..e9a897cd16 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/LazyResourceHolder.java
@@ -21,11 +21,9 @@ package org.apache.druid.msq.querykit;
import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.concurrent.NotThreadSafe;
-import java.io.Closeable;
import java.util.function.Supplier;
@NotThreadSafe
@@ -33,11 +31,10 @@ public class LazyResourceHolder<T> implements
ResourceHolder<T>
{
private static final Logger log = new Logger(LazyResourceHolder.class);
- private final Supplier<Pair<T, Closeable>> supplier;
- private T resource = null;
- private Closeable closer = null;
+ private final Supplier<ResourceHolder<T>> supplier;
+ private ResourceHolder<T> supplied = null;
- public LazyResourceHolder(final Supplier<Pair<T, Closeable>> supplier)
+ public LazyResourceHolder(final Supplier<ResourceHolder<T>> supplier)
{
this.supplier = Preconditions.checkNotNull(supplier, "supplier");
}
@@ -45,28 +42,25 @@ public class LazyResourceHolder<T> implements
ResourceHolder<T>
@Override
public T get()
{
- if (resource == null) {
- final Pair<T, Closeable> supplied = supplier.get();
- resource = Preconditions.checkNotNull(supplied.lhs, "resource");
- closer = Preconditions.checkNotNull(supplied.rhs, "closer");
+ if (supplied == null) {
+ supplied = supplier.get();
}
- return resource;
+ return supplied.get();
}
@Override
public void close()
{
- if (resource != null) {
+ if (supplied != null) {
try {
- closer.close();
+ supplied.close();
}
catch (Throwable e) {
- log.noStackTrace().warn(e, "Exception encountered while closing
resource: %s", resource);
+ log.noStackTrace().warn(e, "Exception encountered while closing
resource: %s", supplied.get());
}
finally {
- resource = null;
- closer = null;
+ supplied = null;
}
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 5bcc4267b9..47427be85a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -48,6 +48,7 @@ import
org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.timeline.SegmentId;
@@ -76,7 +77,7 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
final Int2ObjectMap<ReadableInput> sideChannels,
final GroupByStrategySelector strategySelector,
final JoinableFactoryWrapper joinableFactory,
- final ResourceHolder<WritableFrameChannel> outputChannel,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
@@ -86,7 +87,7 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
baseInput,
sideChannels,
joinableFactory,
- outputChannel,
+ outputChannelHolder,
frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
@@ -103,13 +104,13 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor
segment) throws IOException
{
if (resultYielder == null) {
- closer.register(segment);
+ final ResourceHolder<Segment> segmentHolder =
closer.register(segment.getOrLoad());
final Sequence<ResultRow> rowSequence =
strategySelector.strategize(query)
.process(
query.withQuerySegmentSpec(new
SpecificSegmentSpec(segment.getDescriptor())),
-
mapSegment(segment.getOrLoadSegment()).asStorageAdapter(),
+
mapSegment(segmentHolder.get()).asStorageAdapter(),
null
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
index 285e75eaa5..207a5ae6a2 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
@@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
@@ -69,7 +67,7 @@ public class GroupByPreShuffleFrameProcessorFactory extends
BaseLeafFrameProcess
frameContext.groupByStrategySelector(),
new JoinableFactoryWrapper(frameContext.joinableFactory()),
outputChannelHolder,
- new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(),
frameWriterFactoryHolder)),
+ frameWriterFactoryHolder,
frameContext.memoryParameters().getBroadcastJoinMemory()
);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 0482e2715d..f843c189e3 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -54,6 +54,7 @@ import
org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
@@ -90,7 +91,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
- final ResourceHolder<WritableFrameChannel> outputChannel,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
@Nullable final AtomicLong runningCountForLimit,
final long memoryReservedForBroadcastJoin,
@@ -102,7 +103,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
baseInput,
sideChannels,
joinableFactory,
- outputChannel,
+ outputChannelHolder,
frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
@@ -152,12 +153,12 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
protected ReturnOrAwait<Long> runWithSegment(final SegmentWithDescriptor
segment) throws IOException
{
if (cursor == null) {
- closer.register(segment);
+ final ResourceHolder<Segment> segmentHolder =
closer.register(segment.getOrLoad());
final Yielder<Cursor> cursorYielder = Yielders.each(
makeCursors(
query.withQuerySegmentSpec(new
SpecificSegmentSpec(segment.getDescriptor())),
- mapSegment(segment.getOrLoadSegment()).asStorageAdapter()
+ mapSegment(segmentHolder.get()).asStorageAdapter()
)
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index bc8ec9608e..1f1263ef76 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -28,11 +28,9 @@ import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
@@ -84,7 +82,7 @@ public class ScanQueryFrameProcessorFactory extends
BaseLeafFrameProcessorFactor
sideChannels,
new JoinableFactoryWrapper(frameContext.joinableFactory()),
outputChannelHolder,
- new LazyResourceHolder<>(() -> Pair.of(frameWriterFactoryHolder.get(),
frameWriterFactoryHolder)),
+ frameWriterFactoryHolder,
runningCountForLimit,
frameContext.memoryParameters().getBroadcastJoinMemory(),
frameContext.jsonMapper()
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
new file mode 100644
index 0000000000..b1a97b3da0
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.rpc.CoordinatorServiceClient;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.segment.DimensionHandler;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.Metadata;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentLocalCacheManager;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class TaskDataSegmentProviderTest
+{
+ private static final String DATASOURCE = "foo";
+ private static final int NUM_SEGMENTS = 10;
+ private static final int THREADS = 8;
+ private static final String LOAD_SPEC_FILE_NAME = "data";
+
+ private List<DataSegment> segments;
+ private File cacheDir;
+ private SegmentLocalCacheManager cacheManager;
+ private TaskDataSegmentProvider provider;
+ private ListeningExecutorService exec;
+ private IndexIO indexIO = EasyMock.mock(IndexIO.class);
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws Exception
+ {
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+
+ EasyMock.reset(indexIO);
+ EasyMock.expect(indexIO.loadIndex(EasyMock.anyObject())).andReturn(new
TestQueryableIndex()).anyTimes();
+ EasyMock.replay(indexIO);
+
+ final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
+ jsonMapper.registerSubtypes(TestLoadSpec.class);
+
+ segments = new ArrayList<>();
+
+ for (int i = 0; i < NUM_SEGMENTS; i++) {
+ // Two segments per interval; helps verify that direction creation +
deletion does not include races.
+ final DateTime startTime = DateTimes.of("2000").plusDays(i / 2);
+ final int partitionNum = i % 2;
+
+ segments.add(
+ DataSegment.builder()
+ .dataSource(DATASOURCE)
+ .interval(
+ Intervals.utc(
+ startTime.getMillis(),
+ startTime.plusDays(1).getMillis()
+ )
+ )
+ .version("0")
+ .shardSpec(new NumberedShardSpec(partitionNum, 2))
+ .loadSpec(
+ jsonMapper.convertValue(
+ new TestLoadSpec(i),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ )
+ )
+ .size(1)
+ .build()
+ );
+ }
+
+ cacheDir = temporaryFolder.newFolder();
+ cacheManager = new SegmentLocalCacheManager(
+ new SegmentLoaderConfig().withLocations(
+ ImmutableList.of(new StorageLocationConfig(cacheDir,
10_000_000_000L, null))
+ ),
+ jsonMapper
+ );
+
+ provider = new TaskDataSegmentProvider(
+ new TestCoordinatorServiceClientImpl(),
+ cacheManager,
+ indexIO
+ );
+
+ exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(THREADS,
getClass().getSimpleName() + "-%s"));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ if (indexIO != null) {
+ EasyMock.verify(indexIO);
+ }
+
+ if (exec != null) {
+ exec.shutdownNow();
+ exec.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+
+ @Test
+ public void testConcurrency()
+ {
+ final int iterations = 1000;
+ final List<ListenableFuture<Boolean>> testFutures = new ArrayList<>();
+
+ for (int i = 0; i < iterations; i++) {
+ final int expectedSegmentNumber = i % NUM_SEGMENTS;
+ final DataSegment segment = segments.get(expectedSegmentNumber);
+ final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
+ exec.submit(() -> provider.fetchSegment(segment.getId(), new
ChannelCounters()));
+
+ testFutures.add(
+ FutureUtils.transform(
+ f,
+ holderSupplier -> {
+ try {
+ final ResourceHolder<Segment> holder = holderSupplier.get();
+ Assert.assertEquals(segment.getId(), holder.get().getId());
+
+ final String expectedStorageDir =
DataSegmentPusher.getDefaultStorageDir(segment, false);
+ final File expectedFile = new File(
+ StringUtils.format(
+ "%s/%s/%s",
+ cacheDir,
+ expectedStorageDir,
+ LOAD_SPEC_FILE_NAME
+ )
+ );
+
+ Assert.assertTrue(expectedFile.exists());
+ Assert.assertArrayEquals(
+ Ints.toByteArray(expectedSegmentNumber),
+ Files.readAllBytes(expectedFile.toPath())
+ );
+
+ holder.close();
+
+ return true;
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ )
+ );
+ }
+
+ Assert.assertEquals(iterations, testFutures.size());
+ for (int i = 0; i < iterations; i++) {
+ ListenableFuture<Boolean> testFuture = testFutures.get(i);
+ Assert.assertTrue("Test iteration #" + i,
FutureUtils.getUnchecked(testFuture, false));
+ }
+
+ // Cache dir should exist, but be empty, since we've closed all holders.
+ Assert.assertTrue(cacheDir.exists());
+ Assert.assertArrayEquals(new String[]{}, cacheDir.list());
+ }
+
+ private class TestCoordinatorServiceClientImpl implements
CoordinatorServiceClient
+ {
+ @Override
+ public ListenableFuture<DataSegment> fetchUsedSegment(String dataSource,
String segmentId)
+ {
+ for (final DataSegment segment : segments) {
+ if (segment.getDataSource().equals(dataSource) &&
segment.getId().toString().equals(segmentId)) {
+ return Futures.immediateFuture(segment);
+ }
+ }
+
+ return Futures.immediateFailedFuture(new ISE("No such segment[%s] for
dataSource[%s]", segmentId, dataSource));
+ }
+
+ @Override
+ public CoordinatorServiceClient withRetryPolicy(ServiceRetryPolicy
retryPolicy)
+ {
+ return this;
+ }
+ }
+
+ @JsonTypeName("test")
+ private static class TestLoadSpec implements LoadSpec
+ {
+ private final int uniqueId;
+
+ @JsonCreator
+ public TestLoadSpec(@JsonProperty("uniqueId") int uniqueId)
+ {
+ this.uniqueId = uniqueId;
+ }
+
+ @JsonProperty
+ public int getUniqueId()
+ {
+ return uniqueId;
+ }
+
+ @Override
+ public LoadSpecResult loadSegment(File destDir) throws
SegmentLoadingException
+ {
+ try {
+ Files.write(new File(destDir, LOAD_SPEC_FILE_NAME).toPath(),
Ints.toByteArray(uniqueId));
+ Files.write(new File(destDir, "version.bin").toPath(),
Ints.toByteArray(1));
+ return new LoadSpecResult(1);
+ }
+ catch (IOException e) {
+ throw new SegmentLoadingException(e, "Failed to load segment in
location [%s]", destDir);
+ }
+ }
+ }
+
+ private static class TestQueryableIndex implements QueryableIndex
+ {
+ @Override
+ public Interval getDataInterval()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return 0;
+ }
+
+ @Override
+ public Indexed<String> getAvailableDimensions()
+ {
+ return new ListIndexed<>();
+ }
+
+ @Override
+ public BitmapFactory getBitmapFactoryForDimensions()
+ {
+ return RoaringBitmapFactory.INSTANCE;
+ }
+
+ @Nullable
+ @Override
+ public Metadata getMetadata()
+ {
+ return null;
+ }
+
+ @Override
+ public Map<String, DimensionHandler> getDimensionHandlers()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public List<String> getColumnNames()
+ {
+ return Collections.emptyList();
+ }
+
+ @Nullable
+ @Override
+ public ColumnHolder getColumnHolder(String columnName)
+ {
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index c6c76ae2e9..7fde0083bc 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.querykit.scan;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
@@ -38,13 +39,11 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.StagePartition;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.test.LimitedFrameWriterFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
@@ -150,7 +149,7 @@ public class ScanQueryFrameProcessorTest extends
InitializedNullHandlingTest
}
}
},
- new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
+ new ReferenceCountingResourceHolder<>(frameWriterFactory, () -> {}),
null,
0L,
new DefaultObjectMapper()
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index f08072d100..b11af677b5 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
@@ -35,14 +37,12 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
@@ -80,7 +80,6 @@ import org.joda.time.Interval;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@@ -168,8 +167,7 @@ public class CalciteMSQTestsHelper
));
binder.bind(DataSegmentAnnouncer.class).toInstance(new
NoopDataSegmentAnnouncer());
binder.bind(DataSegmentProvider.class)
- .toInstance((dataSegment, channelCounters) ->
- new
LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
+ .toInstance((dataSegment, channelCounters) ->
getSupplierForSegment(dataSegment));
GroupByQueryConfig groupByQueryConfig = new GroupByQueryConfig();
binder.bind(GroupByStrategySelector.class)
@@ -185,7 +183,7 @@ public class CalciteMSQTestsHelper
);
}
- private static Supplier<Pair<Segment, Closeable>>
getSupplierForSegment(SegmentId segmentId)
+ private static Supplier<ResourceHolder<Segment>>
getSupplierForSegment(SegmentId segmentId)
{
final TemporaryFolder temporaryFolder = new TemporaryFolder();
try {
@@ -291,13 +289,6 @@ public class CalciteMSQTestsHelper
{
}
};
- return new Supplier<Pair<Segment, Closeable>>()
- {
- @Override
- public Pair<Segment, Closeable> get()
- {
- return new Pair<>(segment, Closer.create());
- }
- };
+ return () -> new ReferenceCountingResourceHolder<>(segment,
Closer.create());
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index cc97d4bd82..2e0e09ff19 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -36,6 +36,8 @@ import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -66,7 +68,6 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.input.InputSourceModule;
@@ -95,7 +96,6 @@ import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.querykit.DataSegmentProvider;
-import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -180,7 +180,6 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -387,8 +386,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.bind(QueryProcessingPool.class)
.toInstance(new
ForwardingQueryProcessingPool(Execs.singleThreaded("Test-runner-processing-pool")));
binder.bind(DataSegmentProvider.class)
- .toInstance((dataSegment, channelCounters) ->
- new
LazyResourceHolder<>(getSupplierForSegment(dataSegment)));
+ .toInstance((dataSegment, channelCounters) ->
getSupplierForSegment(dataSegment));
binder.bind(IndexIO.class).toInstance(indexIO);
binder.bind(SpecificSegmentsQuerySegmentWalker.class).toInstance(qf.walker());
@@ -551,7 +549,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
@Nonnull
- private Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId
segmentId)
+ private Supplier<ResourceHolder<Segment>> getSupplierForSegment(SegmentId
segmentId)
{
if (segmentManager.getSegment(segmentId) == null) {
final QueryableIndex index;
@@ -651,7 +649,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
};
segmentManager.addSegment(segment);
}
- return () -> new Pair<>(segmentManager.getSegment(segmentId),
Closer.create());
+ return () ->
ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId));
}
public SelectTester testSelectQuery()
diff --git
a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
similarity index 51%
copy from
processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
copy to
processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
index 5f35bc9c44..e2c8234d38 100644
--- a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
+++
b/processing/src/main/java/org/apache/druid/collections/CloseableResourceHolder.java
@@ -19,14 +19,39 @@
package org.apache.druid.collections;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.utils.CloseableUtils;
+
import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicReference;
-/**
- */
-public interface ResourceHolder<T> extends Closeable
+public class CloseableResourceHolder<T extends Closeable> implements
ResourceHolder<T>
{
- T get();
+ private final AtomicReference<T> resource;
+
+ /**
+ * Use {@link ResourceHolder#fromCloseable}.
+ */
+ CloseableResourceHolder(T resource)
+ {
+ this.resource = new AtomicReference<>(Preconditions.checkNotNull(resource,
"resource"));
+ }
+
+ @Override
+ public T get()
+ {
+ final T retVal = resource.get();
+ if (retVal == null) {
+ throw new ISE("Already closed");
+ }
+ return retVal;
+ }
@Override
- void close();
+ public void close()
+ {
+ final T oldResource = resource.getAndSet(null);
+ CloseableUtils.closeAndWrapExceptions(oldResource);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
index 808100dfb2..6d731923c5 100644
---
a/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
+++
b/processing/src/main/java/org/apache/druid/collections/ReferenceCountingResourceHolder.java
@@ -77,13 +77,14 @@ public class ReferenceCountingResourceHolder<T> implements
ResourceHolder<T>
}
/**
- * Increments the reference count by 1 and returns a {@link Releaser}. The
returned {@link Releaser} is used to
- * decrement the reference count when the caller no longer needs the
resource.
+ * Increments the reference count by 1 and returns a {@link ResourceHolder}
representing the new references.
+ * The returned {@link ResourceHolder} "close" method decrements the
reference count when the caller no longer
+ * needs the resource.
*
- * {@link Releaser}s are not thread-safe. If multiple threads need
references to the same holder, they should
- * each acquire their own {@link Releaser}.
+ * Returned {@link ResourceHolder} are not thread-safe. If multiple threads
need references to the same resource, they
+ * should each call this method on the original object.
*/
- public Releaser increment()
+ public ResourceHolder<T> increment()
{
while (true) {
int count = this.refCount.get();
@@ -95,11 +96,17 @@ public class ReferenceCountingResourceHolder<T> implements
ResourceHolder<T>
}
}
- // This Releaser is supposed to be used from a single thread, so no
synchronization/atomicity
- return new Releaser()
+ // This ResourceHolder is supposed to be used from a single thread, so no
synchronization/atomicity
+ return new ResourceHolder<T>()
{
boolean released = false;
+ @Override
+ public T get()
+ {
+ return object;
+ }
+
@Override
public void close()
{
diff --git
a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
index 5f35bc9c44..0eab667aeb 100644
--- a/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
+++ b/processing/src/main/java/org/apache/druid/collections/ResourceHolder.java
@@ -22,6 +22,7 @@ package org.apache.druid.collections;
import java.io.Closeable;
/**
+ *
*/
public interface ResourceHolder<T> extends Closeable
{
@@ -29,4 +30,9 @@ public interface ResourceHolder<T> extends Closeable
@Override
void close();
+
+ static <T extends Closeable> ResourceHolder<T> fromCloseable(final T
resource)
+ {
+ return new CloseableResourceHolder<>(resource);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index 6718dff9f8..1ad44bc84e 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -32,7 +32,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
-import org.apache.druid.collections.Releaser;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -59,6 +58,7 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import
org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
+import java.io.Closeable;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
@@ -243,9 +243,9 @@ public class GroupByMergingQueryRunnerV2 implements
QueryRunner<ResultRow>
try (
// These variables are used to close
releasers automatically.
@SuppressWarnings("unused")
- Releaser bufferReleaser =
mergeBufferHolder.increment();
+ Closeable bufferReleaser =
mergeBufferHolder.increment();
@SuppressWarnings("unused")
- Releaser grouperReleaser =
grouperHolder.increment()
+ Closeable grouperReleaser =
grouperHolder.increment()
) {
// Return true if OK, false if
resources were exhausted.
return input.run(queryPlusForRunners,
responseContext)
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
index 5d244b579a..d00f6f4785 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ParallelCombiner.java
@@ -28,7 +28,6 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
-import org.apache.druid.collections.Releaser;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -48,7 +47,7 @@ import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
-
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -409,7 +408,7 @@ public class ParallelCombiner<KeyType>
);
// This variable is used to close releaser automatically.
@SuppressWarnings("unused")
- final Releaser releaser = combineBufferHolder.increment()
+ final Closeable releaser = combineBufferHolder.increment()
) {
while (mergedIterator.hasNext()) {
final Entry<KeyType> next = mergedIterator.next();
diff --git
a/processing/src/main/java/org/apache/druid/collections/Releaser.java
b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
similarity index 57%
rename from processing/src/main/java/org/apache/druid/collections/Releaser.java
rename to
processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
index f9f2d77ec3..783e24889c 100644
--- a/processing/src/main/java/org/apache/druid/collections/Releaser.java
+++
b/processing/src/test/java/org/apache/druid/collections/CloseableResourceHolderTest.java
@@ -19,13 +19,29 @@
package org.apache.druid.collections;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
-/**
- * Releaser is like Closeable, but doesn't throw IOExceptions.
- */
-public interface Releaser extends Closeable
+public class CloseableResourceHolderTest
{
- @Override
- void close();
+ @Test
+ public void testCloseableResourceHolder()
+ {
+ final AtomicLong closeCounter = new AtomicLong();
+ final Closeable closeable = closeCounter::incrementAndGet;
+ final ResourceHolder<Closeable> holder =
ResourceHolder.fromCloseable(closeable);
+
+ Assert.assertSame(closeable, holder.get());
+
+ holder.close();
+ Assert.assertEquals(1, closeCounter.get());
+
+ holder.close();
+ Assert.assertEquals(1, closeCounter.get());
+
+ Assert.assertThrows(IllegalStateException.class, holder::get);
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
index 29df2c3aba..9c9983450e 100644
---
a/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
+++
b/processing/src/test/java/org/apache/druid/collections/ReferenceCountingResourceHolderTest.java
@@ -45,7 +45,7 @@ public class ReferenceCountingResourceHolderTest
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
- try (Releaser r = resourceHolder.increment()) {
+ try (ResourceHolder<Closeable> r = resourceHolder.increment()) {
try {
Thread.sleep(1);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index c6c216594e..5f7e71501c 100644
---
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -519,13 +519,13 @@ public class SegmentLocalCacheManager implements
SegmentCacheManager
catch (Exception e) {
log.error(e, "Unable to remove directory[%s]", cacheFile);
}
- }
- File parent = cacheFile.getParentFile();
- if (parent != null) {
- File[] children = parent.listFiles();
- if (children == null || children.length == 0) {
- cleanupCacheFiles(baseFile, parent);
+ File parent = cacheFile.getParentFile();
+ if (parent != null) {
+ File[] children = parent.listFiles();
+ if (children == null || children.length == 0) {
+ cleanupCacheFiles(baseFile, parent);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]