This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 55814888f5 MSQ: Only look at sqlInsertSegmentGranularity on the outer 
query. (#13537)
55814888f5 is described below

commit 55814888f54ebd6a909c60cc0638bf4ea306349f
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Dec 9 07:18:16 2022 -0800

    MSQ: Only look at sqlInsertSegmentGranularity on the outer query. (#13537)
    
    The planner sets sqlInsertSegmentGranularity in its context when using
    PARTITIONED BY, which sets it on every native query in the stack (as all
    native queries for a SQL query typically have the same context).
    QueryKit would interpret that as a request to configure bucketing for
    all native queries. This isn't useful, as bucketing is only used for
    the penultimate stage in INSERT / REPLACE.
    
    So, this patch modifies QueryKit to only look at sqlInsertSegmentGranularity
    on the outermost query.
    
    As an additional change, this patch switches the static ObjectMapper to
    use the processwide ObjectMapper for deserializing Granularities. Saves
    an ObjectMapper instance, and ensures that if there are any special
    serdes registered for Granularity, we'll pick them up.
---
 .../java/org/apache/druid/msq/exec/ControllerImpl.java   |  2 +-
 .../org/apache/druid/msq/querykit/DataSourcePlan.java    | 16 +++++++++++++++-
 .../org/apache/druid/msq/querykit/QueryKitUtils.java     | 15 ++++++++-------
 .../groupby/GroupByPostShuffleFrameProcessor.java        | 14 ++++++++++----
 .../groupby/GroupByPostShuffleFrameProcessorFactory.java |  3 ++-
 .../druid/msq/querykit/groupby/GroupByQueryKit.java      | 11 ++++++++++-
 .../druid/msq/querykit/scan/ScanQueryFrameProcessor.java |  7 +++++--
 .../querykit/scan/ScanQueryFrameProcessorFactory.java    |  3 ++-
 .../org/apache/druid/msq/querykit/scan/ScanQueryKit.java |  3 ++-
 .../java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java  |  1 +
 .../msq/querykit/scan/ScanQueryFrameProcessorTest.java   |  4 +++-
 11 files changed, 59 insertions(+), 20 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 318c33a759..528baa4c27 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -989,7 +989,7 @@ public class ControllerImpl implements Controller
     final Map<Class<? extends Query>, QueryKit> kitMap =
         ImmutableMap.<Class<? extends Query>, QueryKit>builder()
                     .put(ScanQuery.class, new 
ScanQueryKit(context.jsonMapper()))
-                    .put(GroupByQuery.class, new GroupByQueryKit())
+                    .put(GroupByQuery.class, new 
GroupByQueryKit(context.jsonMapper()))
                     .build();
 
     return new MultiQueryKit(kitMap);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index a5c61c5bd7..30544cf31b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -67,6 +68,16 @@ import java.util.stream.Collectors;
  */
 public class DataSourcePlan
 {
+  /**
+   * A map with {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} set to 
null, so we can clear it from the context
+   * of subqueries.
+   */
+  private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY 
= new HashMap<>();
+
+  static {
+    
CONTEXT_MAP_NO_SEGMENT_GRANULARITY.put(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
 null);
+  }
+
   private final DataSource newDataSource;
   private final List<InputSpec> inputSpecs;
   private final IntSet broadcastInputs;
@@ -247,7 +258,10 @@ public class DataSourcePlan
   {
     final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
         queryId,
-        dataSource.getQuery(),
+
+        // Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in 
the context. It's only used for the
+        // outermost query, and setting it for the subquery makes us 
erroneously add bucketing where it doesn't belong.
+        
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
         queryKit,
         ShuffleSpecFactories.subQueryWithMaxWorkerCount(maxWorkerCount),
         maxWorkerCount,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
index fcd7232916..1f863a8c73 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/QueryKitUtils.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.calcite.sql.dialect.CalciteSqlDialect;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.SortColumn;
-import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -76,15 +75,16 @@ public class QueryKitUtils
    */
   public static final String CTX_TIME_COLUMN_NAME = "__timeColumn";
 
-  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
-
-  public static Granularity getSegmentGranularityFromContext(@Nullable final 
Map<String, Object> context)
+  public static Granularity getSegmentGranularityFromContext(
+      final ObjectMapper objectMapper,
+      @Nullable final Map<String, Object> context
+  )
   {
     final Object o = context == null ? null : 
context.get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY);
 
     if (o instanceof String) {
       try {
-        return OBJECT_MAPPER.readValue((String) o, Granularity.class);
+        return objectMapper.readValue((String) o, Granularity.class);
       }
       catch (JsonProcessingException e) {
         throw new ISE("Invalid segment granularity [%s]", o);
@@ -188,9 +188,10 @@ public class QueryKitUtils
    * @throws IllegalArgumentException if the provided granularity is not 
supported
    */
   @Nullable
-  public static VirtualColumn makeSegmentGranularityVirtualColumn(final 
Query<?> query)
+  public static VirtualColumn makeSegmentGranularityVirtualColumn(final 
ObjectMapper jsonMapper, final Query<?> query)
   {
-    final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(query.getContext());
+    final Granularity segmentGranularity =
+        QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
query.getContext());
     final String timeColumnName = 
query.context().getString(QueryKitUtils.CTX_TIME_COLUMN_NAME);
 
     if (timeColumnName == null || 
Granularities.ALL.equals(segmentGranularity)) {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
index a44c14e3cd..207fe53de0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.querykit.groupby;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
@@ -92,7 +93,8 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Long>
       final FrameReader frameReader,
       final RowSignature resultSignature,
       final ClusterBy clusterBy,
-      final MemoryAllocator allocator
+      final MemoryAllocator allocator,
+      final ObjectMapper jsonMapper
   )
   {
     this.query = query;
@@ -107,7 +109,7 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Long>
     this.finalizeFn = makeFinalizeFn(query);
     this.havingSpec = cloneHavingSpec(query);
     this.columnSelectorFactoryForFrameWriter =
-        makeVirtualColumnsForFrameWriter(query).wrap(
+        makeVirtualColumnsForFrameWriter(jsonMapper, query).wrap(
             RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
                 query,
                 () -> outputRow,
@@ -311,9 +313,13 @@ public class GroupByPostShuffleFrameProcessor implements 
FrameProcessor<Long>
    * Create virtual columns containing "bonus" fields that should be attached 
to the {@link FrameWriter} for
    * this processor. Kept in sync with the signature generated by {@link 
GroupByQueryKit}.
    */
-  private static VirtualColumns makeVirtualColumnsForFrameWriter(final 
GroupByQuery query)
+  private static VirtualColumns makeVirtualColumnsForFrameWriter(
+      final ObjectMapper jsonMapper,
+      final GroupByQuery query
+  )
   {
-    final VirtualColumn segmentGranularityVirtualColumn = 
QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
 
     if (segmentGranularityVirtualColumn == null) {
       return VirtualColumns.EMPTY;
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java
index 5987eb02fa..ffb8bacf5e 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessorFactory.java
@@ -118,7 +118,8 @@ public class GroupByPostShuffleFrameProcessorFactory 
extends BaseFrameProcessorF
               readableInput.getChannelFrameReader(),
               stageDefinition.getSignature(),
               stageDefinition.getClusterBy(),
-              outputChannel.getFrameMemoryAllocator()
+              outputChannel.getFrameMemoryAllocator(),
+              frameContext.jsonMapper()
           );
         }
     );
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 411fe118a2..402d2dfa3d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.querykit.groupby;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.SortColumn;
@@ -56,6 +57,13 @@ import java.util.Optional;
 
 public class GroupByQueryKit implements QueryKit<GroupByQuery>
 {
+  private final ObjectMapper jsonMapper;
+
+  public GroupByQueryKit(ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
   @Override
   public QueryDefinition makeQueryDefinition(
       final String queryId,
@@ -85,7 +93,8 @@ public class GroupByQueryKit implements QueryKit<GroupByQuery>
     final GroupByQuery queryToRun = (GroupByQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
     final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
 
-    final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
+    final Granularity segmentGranularity =
+        QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
     final RowSignature intermediateSignature = 
computeIntermediateSignature(queryToRun);
     final ClusterBy resultClusterBy =
         
QueryKitUtils.clusterByWithSegmentGranularity(computeClusterByForResults(queryToRun),
 segmentGranularity);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 307d274c73..0482e2715d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.querykit.scan;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -92,7 +93,8 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
       final ResourceHolder<WritableFrameChannel> outputChannel,
       final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
       @Nullable final AtomicLong runningCountForLimit,
-      final long memoryReservedForBroadcastJoin
+      final long memoryReservedForBroadcastJoin,
+      final ObjectMapper jsonMapper
   )
   {
     super(
@@ -111,7 +113,8 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
     final List<VirtualColumn> frameWriterVirtualColumns = new ArrayList<>();
     frameWriterVirtualColumns.add(partitionBoostVirtualColumn);
 
-    final VirtualColumn segmentGranularityVirtualColumn = 
QueryKitUtils.makeSegmentGranularityVirtualColumn(query);
+    final VirtualColumn segmentGranularityVirtualColumn =
+        QueryKitUtils.makeSegmentGranularityVirtualColumn(jsonMapper, query);
 
     if (segmentGranularityVirtualColumn != null) {
       frameWriterVirtualColumns.add(segmentGranularityVirtualColumn);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 2a948fd456..bda53af696 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -100,7 +100,8 @@ public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactor
             allocatorHolder
         )),
         runningCountForLimit,
-        frameContext.memoryParameters().getBroadcastJoinMemory()
+        frameContext.memoryParameters().getBroadcastJoinMemory(),
+        frameContext.jsonMapper()
     );
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
index 5bfb70b52c..9e44f152eb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java
@@ -116,7 +116,8 @@ public class ScanQueryKit implements QueryKit<ScanQuery>
       signatureToUse = scanSignature;
     } else {
       final RowSignature.Builder signatureBuilder = 
RowSignature.builder().addAll(scanSignature);
-      final Granularity segmentGranularity = 
QueryKitUtils.getSegmentGranularityFromContext(queryToRun.getContext());
+      final Granularity segmentGranularity =
+          QueryKitUtils.getSegmentGranularityFromContext(jsonMapper, 
queryToRun.getContext());
       final List<SortColumn> clusterByColumns = new ArrayList<>();
 
       // Add regular orderBys.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index a91844114d..2ec08e0378 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -207,6 +207,7 @@ public class MSQTaskSqlEngine implements SqlEngine
 
     try {
       segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(
+          plannerContext.getJsonMapper(),
           plannerContext.queryContextMap()
       );
     }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
index 2ea2958c73..d93e8df42d 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.frame.write.FrameWriter;
 import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -152,7 +153,8 @@ public class ScanQueryFrameProcessorTest extends 
InitializedNullHandlingTest
         },
         new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
         null,
-        0L
+        0L,
+        new DefaultObjectMapper()
     );
 
     ListenableFuture<Long> retVal = exec.runFully(processor, null);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to