cryptoe commented on code in PR #15399:
URL: https://github.com/apache/druid/pull/15399#discussion_r1445965449
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -192,39 +196,45 @@ private static ScanQuery
prepareScanQueryForDataServer(@NotNull ScanQuery scanQu
}
@Override
- protected ReturnOrAwait<Unit> runWithLoadedSegment(final
SegmentWithDescriptor segment) throws IOException
+ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final
DataServerQueryHandler dataServerQueryHandler) throws IOException
{
if (cursor == null) {
ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
- final Pair<LoadedSegmentDataProvider.DataServerQueryStatus,
Yielder<Object[]>> statusSequencePair =
- segment.fetchRowsFromDataServer(
+ final DataServerQueryResult<Object[]> dataServerQueryResult =
+ dataServerQueryHandler.fetchRowsFromDataServer(
preparedQuery,
ScanQueryFrameProcessor::mappingFunction,
closer
);
- if
(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF.equals(statusSequencePair.lhs))
{
- log.info("Segment[%s] was handed off, falling back to fetching the
segment from deep storage.",
- segment.getDescriptor());
- return runWithSegment(segment);
+ handedOffSegments = dataServerQueryResult.getHandedOffSegments();
+ if (!handedOffSegments.getDescriptors().isEmpty()) {
+ log.info(
+ "Query to dataserver for segments found [%d] handed off segments",
+ handedOffSegments.getDescriptors().size()
+ );
}
-
RowSignature rowSignature =
ScanQueryKit.getAndValidateSignature(preparedQuery, jsonMapper);
- Pair<Cursor, Closeable> cursorFromIterable =
IterableRowsCursorHelper.getCursorFromYielder(
- statusSequencePair.rhs,
- rowSignature
- );
+ List<Cursor> cursors =
dataServerQueryResult.getResultsYielders().stream().map(yielder -> {
+ Pair<Cursor, Closeable> cursorFromIterable =
IterableRowsCursorHelper.getCursorFromYielder(
+ yielder,
+ rowSignature
+ );
+ closer.register(cursorFromIterable.rhs);
+ return cursorFromIterable.lhs;
+ }).collect(Collectors.toList());
- closer.register(cursorFromIterable.rhs);
- final Yielder<Cursor> cursorYielder =
Yielders.each(Sequences.simple(ImmutableList.of(cursorFromIterable.lhs)));
+ final Yielder<Cursor> cursorYielder =
Yielders.each(Sequences.simple(cursors));
if (cursorYielder.isDone()) {
// No cursors!
cursorYielder.close();
- return ReturnOrAwait.returnObject(Unit.instance());
+ return ReturnOrAwait.returnObject(handedOffSegments);
Review Comment:
If there are not handed of segments nulls will be passed. Is that intended ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java:
##########
@@ -100,23 +105,38 @@ public GroupByPreShuffleFrameProcessor(
}
@Override
- protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor
segment) throws IOException
+ protected ReturnOrAwait<SegmentsInputSlice>
runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws
IOException
{
- if (resultYielder == null) {
- Pair<LoadedSegmentDataProvider.DataServerQueryStatus,
Yielder<ResultRow>> statusSequencePair =
-
segment.fetchRowsFromDataServer(groupingEngine.prepareGroupByQuery(query),
Function.identity(), closer);
- if
(LoadedSegmentDataProvider.DataServerQueryStatus.HANDOFF.equals(statusSequencePair.lhs))
{
- log.info("Segment[%s] was handed off, falling back to fetching the
segment from deep storage.",
- segment.getDescriptor());
- return runWithSegment(segment);
+ if (resultYielder == null || resultYielder.isDone()) {
+ if (yielderYielder == null) {
Review Comment:
lets rename this to currentResultsYielder?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -110,7 +110,8 @@ protected FrameWriterFactory getFrameWriterFactory()
}
protected abstract ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor
segment) throws IOException;
- protected abstract ReturnOrAwait<Unit>
runWithLoadedSegment(SegmentWithDescriptor segment) throws IOException;
+
+ protected abstract ReturnOrAwait<SegmentsInputSlice>
runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws
IOException;
Review Comment:
Java docs for this method please.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java:
##########
@@ -146,32 +142,91 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
);
// Function to generate a processor manger for the regular processors,
which run after the segmentMapFnProcessor.
- final Function<Function<SegmentReference, SegmentReference>,
ProcessorManager<Object, Long>> processorManagerFn =
- segmentMapFn ->
- new BaseLeafFrameProcessorManager(
- processorBaseInputs,
- segmentMapFn,
- frameWriterFactoryQueue,
- channelQueue,
- frameContext,
- this
- );
+ final Function<List<Function<SegmentReference, SegmentReference>>,
ProcessorManager<Object, Long>> processorManagerFn = segmentMapFnList -> {
+ final Function<SegmentReference, SegmentReference> segmentMapFunction =
+ CollectionUtils.getOnlyElement(segmentMapFnList, throwable ->
DruidException.defensive("Only one segment map function expected"));
+ return createBaseLeafProcessorManagerWithHandoff(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext
+ );
+ };
//noinspection rawtypes
final ProcessorManager processorManager;
if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new
AtomicLong());
- processorManager = processorManagerFn.apply(segmentMapFn);
+ processorManager =
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
- processorManager = new ChainedProcessorManager<>(segmentMapFnProcessor,
processorManagerFn);
+ processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(()
-> segmentMapFnProcessor), processorManagerFn);
}
//noinspection unchecked,rawtypes
return new ProcessorsAndChannels<>(processorManager,
OutputChannels.wrapReadOnly(outputChannels));
}
+ private ProcessorManager<Object, Long>
createBaseLeafProcessorManagerWithHandoff(
+ final StageDefinition stageDefinition,
+ final List<InputSlice> inputSlices,
+ final InputSliceReader inputSliceReader,
+ final CounterTracker counters,
+ final Consumer<Throwable> warningPublisher,
+ final Function<SegmentReference, SegmentReference> segmentMapFunction,
+ final Queue<FrameWriterFactory> frameWriterFactoryQueue,
+ final Queue<WritableFrameChannel> channelQueue,
+ final FrameContext frameContext
+ )
+ {
+ final BaseLeafFrameProcessorFactory factory = this;
+ // Read all base inputs in separate processors, one per processor.
+ final Iterable<ReadableInput> processorBaseInputs = readBaseInputs(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher
+ );
+
+ return new ChainedProcessorManager<>(
+ new BaseLeafFrameProcessorManager(
+ processorBaseInputs,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext,
+ factory
+ ),
+ objects -> {
+ if (objects.isEmpty()) {
+ return ProcessorManagers.none();
+ }
+ List<InputSlice> handedOffSegments = new ArrayList<>();
+ for (Object o : objects) {
+ if (o instanceof SegmentsInputSlice) {
Review Comment:
O can be null as well.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java:
##########
@@ -24,32 +24,34 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
import org.apache.druid.frame.processor.manager.ProcessorManager;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
- * Manager that chains processors: runs {@link #first} first, then based on
its result, creates {@link #restFuture}
- * using {@link #restFactory} and runs that next.
+ * Manager that chains processors: runs all processors generated by {@link
#first} first, then based on its result,
+ * creates {@link #restFuture} using {@link #restFactory} and runs that next.
*/
public class ChainedProcessorManager<A, B, R> implements
ProcessorManager<Object, R>
Review Comment:
@gianm Can you please vet the changes in this class.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java:
##########
@@ -169,9 +243,40 @@ public RichSegmentDescriptor toRichSegmentDescriptor()
segment.getInterval(),
interval,
segment.getVersion(),
- segment.getShardSpec().getPartitionNum(),
- segment instanceof DataSegmentWithLocation ?
((DataSegmentWithLocation) segment).getServers() : null
+ segment.getShardSpec().getPartitionNum()
);
}
+
+ @Override
+ public long getWeight()
+ {
+ return segment.getSize();
+ }
+ }
+
+ private static class DataServerRequest implements WeightedInputInstance
+ {
+ private static final long DATA_SERVER_WEIGHT_ESTIMATION = 5000L;
+ private final List<DataSegmentWithInterval> segments;
+ private final DruidServerMetadata serverMetadata;
+
+ public DataServerRequest(DruidServerMetadata serverMetadata,
List<DataSegmentWithInterval> segments)
+ {
+ this.segments = Preconditions.checkNotNull(segments, "segments");
+ this.serverMetadata = Preconditions.checkNotNull(serverMetadata,
"server");
+ }
+
+ @Override
+ public long getWeight()
+ {
+ return segments.size() * DATA_SERVER_WEIGHT_ESTIMATION;
Review Comment:
We are estimating the number of rows to be 5000 for realtime segments rite ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java:
##########
@@ -126,28 +161,67 @@ private Set<DataSegmentWithInterval>
getPrunedSegmentSet(final TableInputSpec ta
private static List<InputSlice> makeSlices(
final TableInputSpec tableInputSpec,
- final List<List<DataSegmentWithInterval>> assignments
+ final List<List<WeightedInputInstance>> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
- for (final List<DataSegmentWithInterval> assignment : assignments) {
+ for (final List<WeightedInputInstance> assignment : assignments) {
+
final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
- for (final DataSegmentWithInterval dataSegmentWithInterval : assignment)
{
- descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ final List<DataServerRequestDescriptor> dataServerRequests = new
ArrayList<>();
+
+ for (final WeightedInputInstance weightedSegment : assignment) {
+ if (weightedSegment instanceof DataSegmentWithInterval) {
+ DataSegmentWithInterval dataSegmentWithInterval =
(DataSegmentWithInterval) weightedSegment;
+ descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ } else {
+ DataServerRequest serverRequest = (DataServerRequest)
weightedSegment;
+
dataServerRequests.add(serverRequest.toDataServerRequestDescriptor());
+ }
}
- if (descriptors.isEmpty()) {
+ if (descriptors.isEmpty() && dataServerRequests.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
- retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(),
descriptors));
+ retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(),
descriptors, dataServerRequests));
}
}
return retVal;
}
- private static class DataSegmentWithInterval
+ private static List<WeightedInputInstance>
createWeightedSegmentSet(List<DataSegmentWithInterval> prunedServedSegments)
Review Comment:
can you please java doc this method.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/TableInputSpecSlicer.java:
##########
@@ -126,28 +161,67 @@ private Set<DataSegmentWithInterval>
getPrunedSegmentSet(final TableInputSpec ta
private static List<InputSlice> makeSlices(
final TableInputSpec tableInputSpec,
- final List<List<DataSegmentWithInterval>> assignments
+ final List<List<WeightedInputInstance>> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
- for (final List<DataSegmentWithInterval> assignment : assignments) {
+ for (final List<WeightedInputInstance> assignment : assignments) {
+
final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
- for (final DataSegmentWithInterval dataSegmentWithInterval : assignment)
{
- descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ final List<DataServerRequestDescriptor> dataServerRequests = new
ArrayList<>();
+
+ for (final WeightedInputInstance weightedSegment : assignment) {
+ if (weightedSegment instanceof DataSegmentWithInterval) {
+ DataSegmentWithInterval dataSegmentWithInterval =
(DataSegmentWithInterval) weightedSegment;
+ descriptors.add(dataSegmentWithInterval.toRichSegmentDescriptor());
+ } else {
+ DataServerRequest serverRequest = (DataServerRequest)
weightedSegment;
+
dataServerRequests.add(serverRequest.toDataServerRequestDescriptor());
+ }
}
- if (descriptors.isEmpty()) {
+ if (descriptors.isEmpty() && dataServerRequests.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
- retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(),
descriptors));
+ retVal.add(new SegmentsInputSlice(tableInputSpec.getDataSource(),
descriptors, dataServerRequests));
}
}
return retVal;
}
- private static class DataSegmentWithInterval
+ private static List<WeightedInputInstance>
createWeightedSegmentSet(List<DataSegmentWithInterval> prunedServedSegments)
+ {
+ // Create a map of server to segment for loaded segments.
+ final Map<DruidServerMetadata, Set<DataSegmentWithInterval>>
serverVsSegmentsMap = new HashMap<>();
+ for (DataSegmentWithInterval dataSegmentWithInterval :
prunedServedSegments) {
+ DataSegmentWithLocation segmentWithLocation = (DataSegmentWithLocation)
dataSegmentWithInterval.segment;
+ // Choose a server out of the ones available.
+ DruidServerMetadata druidServerMetadata =
DataServerSelector.RANDOM.getSelectServerFunction().apply(segmentWithLocation.getServers());
+
+ serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> new
HashSet<>());
+
serverVsSegmentsMap.get(druidServerMetadata).add(dataSegmentWithInterval);
+ }
+
+ List<WeightedInputInstance> retVal = new ArrayList<>();
+ for (Map.Entry<DruidServerMetadata, Set<DataSegmentWithInterval>>
druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
+ DataServerRequest dataServerRequest = new DataServerRequest(
+ druidServerMetadataSetEntry.getKey(),
+ ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
+ );
+ retVal.add(dataServerRequest);
+ }
+
+ return retVal;
+ }
+
+ private interface WeightedInputInstance
Review Comment:
I think we need docs here as well.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ChainedProcessorManager.java:
##########
@@ -76,22 +91,31 @@ public
ListenableFuture<Optional<ProcessorAndCallback<Object>>> next()
if (closed) {
throw new IllegalStateException();
} else if (first != null) {
- //noinspection unchecked
- final FrameProcessor<Object> tmp = (FrameProcessor<Object>) first;
- first = null;
- return Futures.immediateFuture(Optional.of(new
ProcessorAndCallback<>(tmp, this::onFirstProcessorComplete)));
- } else {
- return FutureUtils.transformAsync(
- restFuture,
- rest -> (ListenableFuture) rest.next()
- );
+ Optional<ProcessorAndCallback<A>> processorAndCallbackOptional =
Futures.getUnchecked(first.next());
+ if (processorAndCallbackOptional.isPresent()) {
+ // More processors left to run.
+ firstProcessorCount.incrementAndGet();
+ ProcessorAndCallback<A> aProcessorAndCallback =
processorAndCallbackOptional.get();
+ //noinspection unchecked
+ return
Futures.immediateFuture(Optional.of((ProcessorAndCallback<Object>)
aProcessorAndCallback));
+ } else {
+ first = null;
+ checkFirstProcessorComplete();
+ }
}
+
+ //noinspection unchecked
+ return FutureUtils.transformAsync(
+ restFuture,
+ rest -> (ListenableFuture) rest.next()
+ );
}
- private void onFirstProcessorComplete(final Object firstResult)
+ private synchronized void checkFirstProcessorComplete()
Review Comment:
Why is synchronized here ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java:
##########
@@ -146,32 +142,91 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
);
// Function to generate a processor manger for the regular processors,
which run after the segmentMapFnProcessor.
- final Function<Function<SegmentReference, SegmentReference>,
ProcessorManager<Object, Long>> processorManagerFn =
- segmentMapFn ->
- new BaseLeafFrameProcessorManager(
- processorBaseInputs,
- segmentMapFn,
- frameWriterFactoryQueue,
- channelQueue,
- frameContext,
- this
- );
+ final Function<List<Function<SegmentReference, SegmentReference>>,
ProcessorManager<Object, Long>> processorManagerFn = segmentMapFnList -> {
+ final Function<SegmentReference, SegmentReference> segmentMapFunction =
+ CollectionUtils.getOnlyElement(segmentMapFnList, throwable ->
DruidException.defensive("Only one segment map function expected"));
+ return createBaseLeafProcessorManagerWithHandoff(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext
+ );
+ };
//noinspection rawtypes
final ProcessorManager processorManager;
if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new
AtomicLong());
- processorManager = processorManagerFn.apply(segmentMapFn);
+ processorManager =
processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
- processorManager = new ChainedProcessorManager<>(segmentMapFnProcessor,
processorManagerFn);
+ processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(()
-> segmentMapFnProcessor), processorManagerFn);
}
//noinspection unchecked,rawtypes
return new ProcessorsAndChannels<>(processorManager,
OutputChannels.wrapReadOnly(outputChannels));
}
+ private ProcessorManager<Object, Long>
createBaseLeafProcessorManagerWithHandoff(
+ final StageDefinition stageDefinition,
+ final List<InputSlice> inputSlices,
+ final InputSliceReader inputSliceReader,
+ final CounterTracker counters,
+ final Consumer<Throwable> warningPublisher,
+ final Function<SegmentReference, SegmentReference> segmentMapFunction,
+ final Queue<FrameWriterFactory> frameWriterFactoryQueue,
+ final Queue<WritableFrameChannel> channelQueue,
+ final FrameContext frameContext
+ )
+ {
+ final BaseLeafFrameProcessorFactory factory = this;
+ // Read all base inputs in separate processors, one per processor.
+ final Iterable<ReadableInput> processorBaseInputs = readBaseInputs(
+ stageDefinition,
+ inputSlices,
+ inputSliceReader,
+ counters,
+ warningPublisher
+ );
+
+ return new ChainedProcessorManager<>(
+ new BaseLeafFrameProcessorManager(
+ processorBaseInputs,
+ segmentMapFunction,
+ frameWriterFactoryQueue,
+ channelQueue,
+ frameContext,
+ factory
+ ),
+ objects -> {
+ if (objects.isEmpty()) {
+ return ProcessorManagers.none();
+ }
+ List<InputSlice> handedOffSegments = new ArrayList<>();
+ for (Object o : objects) {
+ if (o instanceof SegmentsInputSlice) {
+ SegmentsInputSlice slice = (SegmentsInputSlice) o;
+ handedOffSegments.add(slice);
+ }
+ }
+ return new BaseLeafFrameProcessorManager(
Review Comment:
I think we are fetching stuff from deep storage here. Lets add a comment for
future maintenance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]