cryptoe commented on code in PR #15024:
URL: https://github.com/apache/druid/pull/15024#discussion_r1339546844


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java:
##########
@@ -181,6 +184,15 @@ public static boolean isFinalizeAggregations(final 
QueryContext queryContext)
     );
   }
 
+  public static boolean isIncludeRealtime(final QueryContext queryContext)

Review Comment:
   This ideally should be a enum which the user provides. 
   
   The enum values can be : 
   1. Realtime
   2. Hybrid (Realtime+Historical)
   3. Historical
   
   We can change the names to be better but the general idea is this query 
context will be evolved for future hybrid usecases. 
   
   So how does includeSegmentSource sound ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public interface LoadedSegmentDataProvider
+{
+  <ReturnType, QueryType> Pair<DataServerQueryStatus, Sequence<ReturnType>> 
fetchRowsFromDataServer(
+      Query<QueryType> query,
+      Function<Sequence<QueryType>, Sequence<ReturnType>> mappingFunction,
+      Class<QueryType> queryResultType
+  ) throws IOException;
+
+  enum DataServerQueryStatus
+  {
+    SUCCESS,

Review Comment:
   Pleas add some documentation . 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1158,14 +1163,42 @@ private QueryKit makeQueryControllerToolKit()
 
   private DataSegmentTimelineView makeDataSegmentTimelineView()
   {
+    final boolean isIncludeRealtime = 
MultiStageQueryContext.isIncludeRealtime(task.getQuerySpec().getQuery().context());
+
     return (dataSource, intervals) -> {
-      final Collection<DataSegment> dataSegments =
+      final Iterable<ImmutableSegmentLoadInfo> serverViewSegments;

Review Comment:
   Lets rename this variable to 
realtimeAndHistoricalSegments/realtimeAndHotSegments



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1158,14 +1163,42 @@ private QueryKit makeQueryControllerToolKit()
 
   private DataSegmentTimelineView makeDataSegmentTimelineView()
   {
+    final boolean isIncludeRealtime = 
MultiStageQueryContext.isIncludeRealtime(task.getQuerySpec().getQuery().context());
+
     return (dataSource, intervals) -> {
-      final Collection<DataSegment> dataSegments =
+      final Iterable<ImmutableSegmentLoadInfo> serverViewSegments;
+
+      // Fetch the realtime segments first, so that we don't miss any segment 
if they get handed off between the two calls.

Review Comment:
   I think this api will also return hot/historical segments. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1158,14 +1163,42 @@ private QueryKit makeQueryControllerToolKit()
 
   private DataSegmentTimelineView makeDataSegmentTimelineView()
   {
+    final boolean isIncludeRealtime = 
MultiStageQueryContext.isIncludeRealtime(task.getQuerySpec().getQuery().context());
+
     return (dataSource, intervals) -> {
-      final Collection<DataSegment> dataSegments =
+      final Iterable<ImmutableSegmentLoadInfo> serverViewSegments;
+
+      // Fetch the realtime segments first, so that we don't miss any segment 
if they get handed off between the two calls.
+      if (isIncludeRealtime) {
+        serverViewSegments = 
context.coordinatorClient().fetchServerViewSegments(dataSource, intervals);
+      } else {
+        serverViewSegments = ImmutableList.of();
+      }
+
+      final Collection<DataSegment> metadataStoreSegments =

Review Comment:
   nit:lets rename this to publishedUsedSegments



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public interface LoadedSegmentDataProvider
+{
+  <ReturnType, QueryType> Pair<DataServerQueryStatus, Sequence<ReturnType>> 
fetchRowsFromDataServer(
+      Query<QueryType> query,
+      Function<Sequence<QueryType>, Sequence<ReturnType>> mappingFunction,

Review Comment:
   This mapping Function looks like a implementation details. Why does the 
fetchRowsFromDataServer care about this. 
   
   You can have 2 impls , one for scan and one for group by for this method and 
do the mapping in the implementation. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProvider.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+public interface LoadedSegmentDataProvider
+{
+  <ReturnType, QueryType> Pair<DataServerQueryStatus, Sequence<ReturnType>> 
fetchRowsFromDataServer(

Review Comment:
   Please document the interface. 
   Nit : lets change the name returnType to something else, Maybe RowType ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java:
##########
@@ -33,30 +42,35 @@
 public class SegmentWithDescriptor
 {
   private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
+  private final LoadedSegmentDataProvider loadedSegmentDataProvider;
   private final SegmentDescriptor descriptor;
 
   /**
    * Create a new instance.
    *
-   * @param segmentSupplier supplier of a {@link ResourceHolder} of segment. 
The {@link ResourceHolder#close()} logic
-   *                        must include a delegated call to {@link 
Segment#close()}.
-   * @param descriptor      segment descriptor
+   * @param segmentSupplier           supplier of a {@link ResourceHolder} of 
segment. The {@link ResourceHolder#close()}
+   *                                  logic must include a delegated call to 
{@link Segment#close()}.
+   * @param loadedSegmentDataProvider {@link LoadedSegmentDataProvider} which 
fetches the corresponding results from a
+   *                                  data server where the segment is loaded. 
The call will fetch the
+   * @param descriptor                segment descriptor
    */
   public SegmentWithDescriptor(
       final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
+      final LoadedSegmentDataProvider loadedSegmentDataProvider,
       final SegmentDescriptor descriptor

Review Comment:
   Can we change the segmentDescriptor to richSegmentDescriptor



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input.table;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.druid.jackson.CommaListJoinDeserializer;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Data segment including the locations which contain the segment. Used if MSQ 
needs to fetch the segment from a server
+ * instead of from deep storage.
+ */
+public class DataSegmentWithLocation extends DataSegment
+{
+  private final Set<DruidServerMetadata> servers;
+
+  @JsonCreator
+  public DataSegmentWithLocation(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("version") String version,
+      // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization 
to prevent dependency pollution
+      @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
+      @JsonProperty("dimensions")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+      List<String> dimensions,
+      @JsonProperty("metrics")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+      List<String> metrics,
+      @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
+      @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
+      @JsonProperty("binaryVersion") Integer binaryVersion,
+      @JsonProperty("size") long size,
+      @JsonProperty("servers") Set<DruidServerMetadata> servers,

Review Comment:
   Can this be nullable ?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1158,14 +1163,42 @@ private QueryKit makeQueryControllerToolKit()
 
   private DataSegmentTimelineView makeDataSegmentTimelineView()
   {
+    final boolean isIncludeRealtime = 
MultiStageQueryContext.isIncludeRealtime(task.getQuerySpec().getQuery().context());
+
     return (dataSource, intervals) -> {
-      final Collection<DataSegment> dataSegments =
+      final Iterable<ImmutableSegmentLoadInfo> serverViewSegments;
+
+      // Fetch the realtime segments first, so that we don't miss any segment 
if they get handed off between the two calls.
+      if (isIncludeRealtime) {
+        serverViewSegments = 
context.coordinatorClient().fetchServerViewSegments(dataSource, intervals);
+      } else {
+        serverViewSegments = ImmutableList.of();
+      }
+
+      final Collection<DataSegment> metadataStoreSegments =
           
FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource,
 intervals), true);
 
-      if (dataSegments.isEmpty()) {
+      Set<DataSegment> unifiedSegmentView = new 
HashSet<>(metadataStoreSegments);
+      for (ImmutableSegmentLoadInfo segmentLoadInfo : serverViewSegments) {
+        ImmutableSet<DruidServerMetadata> servers = 
segmentLoadInfo.getServers();
+        Set<DruidServerMetadata> realtimeServerMetadata = servers.stream()
+                                                                 
.filter(druidServerMetadata ->
+                                                                             
ServerType.REALTIME.equals(druidServerMetadata.getType()) ||
+                                                                             
ServerType.INDEXER_EXECUTOR.equals(druidServerMetadata.getType()))
+                                                                 
.collect(Collectors.toSet());
+        if (!realtimeServerMetadata.isEmpty()) {

Review Comment:
   Can you please add a comment here that if are only setting datasegment 
location only for realtime segments. In the future, we will also set it for 
segments which are loaded on the historical. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/LoadedSegmentDataProviderImpl.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.discovery.DataServerClient;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.query.Queries;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.context.DefaultResponseContext;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.rpc.FixedSetServiceLocator;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+public class LoadedSegmentDataProviderImpl implements LoadedSegmentDataProvider
+{
+  private static final Logger log = new 
Logger(LoadedSegmentDataProviderImpl.class);
+  private static final int DEFAULT_NUM_TRIES = 5;
+  private final RichSegmentDescriptor segmentDescriptor;
+  private final String dataSource;
+  private final ChannelCounters channelCounters;
+  private final ServiceClientFactory serviceClientFactory;
+  private final CoordinatorClient coordinatorClient;
+  private final ObjectMapper objectMapper;
+
+  public LoadedSegmentDataProviderImpl(
+      RichSegmentDescriptor segmentDescriptor,
+      String dataSource,
+      ChannelCounters channelCounters,
+      ServiceClientFactory serviceClientFactory,
+      CoordinatorClient coordinatorClient,
+      ObjectMapper objectMapper
+  )
+  {
+    this.segmentDescriptor = segmentDescriptor;
+    this.dataSource = dataSource;
+    this.channelCounters = channelCounters;
+    this.serviceClientFactory = serviceClientFactory;
+    this.coordinatorClient = coordinatorClient;
+    this.objectMapper = objectMapper;
+  }
+
+  @Override
+  public <ReturnType, QueryType> Pair<DataServerQueryStatus, 
Yielder<ReturnType>> fetchRowsFromDataServer(

Review Comment:
   Each query type should implement this method so that they can do the per 
Query transforms. 
   That way mapping function can be removed. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -156,7 +156,12 @@ public ReturnOrAwait<Long> runIncrementally(final IntSet 
readableInputs) throws
     } else if (readableInputs.size() != inputChannels.size()) {
       return ReturnOrAwait.awaitAll(inputChannels.size());
     } else if (baseInput.hasSegment()) {
-      return runWithSegment(baseInput.getSegment());
+      SegmentWithDescriptor segment = baseInput.getSegment();

Review Comment:
   It feels weird we are making the segmentLoadingProvider changes in the 
SegmentWithDescriptor. 
   
   Instead expose the method segment.isLoadededOnServer() in segmentDescriptor 
and make appropriate changes in the base leaf frame processor to understand a 
segment with location . 
   So your RichSegmentDescrptor should have the necessary information like 
location for fetching the segment from indexers.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to