gianm commented on code in PR #18076:
URL: https://github.com/apache/druid/pull/18076#discussion_r2130278299
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java:
##########
@@ -170,9 +173,7 @@ public WorkerStorageParameters storageParameters()
@Override
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
- // We don't query data servers. This factory won't actually be used,
because Dart doesn't allow segmentSource to be
- // overridden; it always uses SegmentSource.NONE. (If it is called, some
wires got crossed somewhere.)
- return null;
+ return dataServerQueryHandlerFactory;
Review Comment:
🎉
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartServerAssignment.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.msq.dart.worker.DartQueryableSegment;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents the set of segments assigned to a particular dart worker, used
by {@link DartTableInputSpecSlicer}.
+ */
+public class DartServerAssignment
Review Comment:
This should be called `DartSegmentAssignment`, since it's assigning
segments. (It's also assigning _to_ workers, so it could be called
`DartWorkerAssignment`. Either way, `DartServerAssignment` isn't quite right.)
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java:
##########
@@ -71,15 +78,32 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
*/
private final TimelineServerView serverView;
- DartTableInputSpecSlicer(final Object2IntMap<String> workerIdToNumber, final
TimelineServerView serverView)
+ /**
+ * Determines the kind of tasks that should be queried.
+ */
+ private final SegmentSource segmentSource;
+
+ /**
+ * Determines if cloning historicals should be queried.
+ */
+ private final CloneQueryMode cloneQueryMode;
+
+ DartTableInputSpecSlicer(
+ final Object2IntMap<String> workerIdToNumber,
+ final TimelineServerView serverView,
+ final QueryContext queryContext
+ )
{
this.workerIdToNumber = workerIdToNumber;
this.serverView = serverView;
+ this.segmentSource =
MultiStageQueryContext.getSegmentSources(queryContext);
Review Comment:
Since Dart SQL is meant to be equivalent to native SQL, it should have
`segmentSource` default to `REALTIME`.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableRealtimeTask.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+/**
+ * Represents a realtime segment, that must be queried at a task pointed to by
{@link DruidServerMetadata}.
+ */
+public class DartQueryableRealtimeTask extends DartQueryableSegment
Review Comment:
Looking at how this is used, IMO it would be cleaner to incorporate
`@Nullable DruidServerMetadata realtimeServer` into `DartQueryableSegment`
rather than using a subclass. There's too much `instanceof` and casting going
on, which is a sign that subclassing isn't very useful.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java:
##########
@@ -228,27 +282,28 @@ static Set<DartQueryableSegment>
findQueryableDataSegments(
*
* @throws IllegalStateException if any provided segments do not match the
provided datasource
*/
- static List<InputSlice> makeSegmentSlices(
+ private List<InputSlice> makeSegmentSlices(
final String dataSource,
- final List<List<DartQueryableSegment>> assignments
+ final List<DartServerAssignment> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
- for (final List<DartQueryableSegment> assignment : assignments) {
+ for (final DartServerAssignment assignment : assignments) {
if (assignment == null || assignment.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
- final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
- for (final DartQueryableSegment segment : assignment) {
- if (!dataSource.equals(segment.getSegment().getDataSource())) {
- throw new ISE("Expected dataSource[%s] but got[%s]", dataSource,
segment.getSegment().getDataSource());
- }
-
- descriptors.add(toRichSegmentDescriptor(segment));
- }
-
- retVal.add(new SegmentsInputSlice(dataSource, descriptors,
ImmutableList.of()));
+ final List<RichSegmentDescriptor> descriptors =
assignment.getDartQueryableSegments()
Review Comment:
style nit: this is actually longer (more lines) than the original imperative
code that didn't use streams. It's also intended far to the right, which leads
to very long lines. IMO, the original imperative code was better stylistically.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java:
##########
@@ -218,6 +252,26 @@ static Set<DartQueryableSegment> findQueryableDataSegments(
);
}
+ private DartQueryableSegment toDartQueryableSegment(
+ ServerSelector serverSelector,
+ Interval interval,
+ ToIntFunction<ServerSelector> toWorkersFunction
+ )
+ {
+ final DataSegment dataSegment = serverSelector.getSegment();
+ if (serverSelector.isRealtimeSegment()) {
+ final Set<DruidServerMetadata> servers =
serverSelector.getAllServers(cloneQueryMode)
Review Comment:
style nit: this stack of calls starts too far to the right and so it leads
to very long lines with lots of dead air on the left-hand side. It would look
nicer to start it on the next line and format it like this:
```
final Set<DruidServerMetadata> servers =
serverSelector.getAllServers(cloneQueryMode)
.stream()
.filter(druidServerMetadata ->
segmentSource.getUsedServerTypes()
.contains(druidServerMetadata.getType()))
.collect(Collectors.toSet());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]