This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 40414cfe780 MSQ window functions: Reject MVDs during window processing 
(#17036)
40414cfe780 is described below

commit 40414cfe780b5b45375f98b26b5b61b9ca58d05f
Author: Akshat Jain <[email protected]>
AuthorDate: Mon Sep 23 11:39:35 2024 +0530

    MSQ window functions: Reject MVDs during window processing (#17036)
    
    * MSQ window functions: Reject MVDs during window processing
    
    * MSQ window functions: Reject MVDs during window processing
    
    * Remove parameterization from MSQWindowTest
---
 .../WindowOperatorQueryFrameProcessor.java         |  10 +
 .../org/apache/druid/msq/exec/MSQWindowTest.java   | 367 ++++++++++-----------
 .../rowsandcols/RearrangedRowsAndColumns.java      |  13 +-
 3 files changed, 200 insertions(+), 190 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 3dc62f3a60d..aab8f1f1a6b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -34,6 +34,7 @@ import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.util.SettableLongVirtualColumn;
 import org.apache.druid.frame.write.FrameWriter;
 import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.indexing.error.MSQException;
@@ -54,6 +55,7 @@ import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.NullableTypeStrategy;
 import org.apache.druid.segment.column.RowSignature;
 
@@ -451,6 +453,14 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     int match = 0;
     for (String columnName : partitionColumnNames) {
       int i = frameReader.signature().indexOf(columnName);
+      if 
(ColumnType.STRING.equals(frameReader.signature().getColumnType(columnName).get())
 && (row1.get(i) instanceof List || row2.get(i) instanceof List)) {
+        // special handling to reject MVDs
+        throw new UOE(
+            "Encountered a multi value column [%s]. Window processing does not 
support MVDs. "
+            + "Consider using UNNEST or MV_TO_ARRAY.",
+            columnName
+        );
+      }
       if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
         match++;
       }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
index f694fc9f39f..f423b5153a0 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.msq.indexing.MSQSpec;
@@ -64,11 +65,10 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.timeline.SegmentId;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
+import org.hamcrest.CoreMatchers;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -77,19 +77,8 @@ import java.util.Map;
 
 public class MSQWindowTest extends MSQTestBase
 {
-  public static Collection<Object[]> data()
-  {
-    Object[][] data = new Object[][]{
-        {DEFAULT, DEFAULT_MSQ_CONTEXT}
-    };
-
-    return Arrays.asList(data);
-  }
-
-
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithPartitionByAndInnerGroupBy(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithPartitionByAndInnerGroupBy()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -108,7 +97,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.FLOAT
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -121,7 +110,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))),
@@ -151,7 +140,7 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0},
             new Object[]{6.0f, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with().totalFiles(1),
@@ -170,9 +159,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -198,7 +186,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.DOUBLE
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -215,7 +203,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("d0", ColumnType.FLOAT)
                     .add("d1", ColumnType.DOUBLE)
@@ -258,7 +246,7 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0, 5.0, 21.0},
             new Object[]{6.0f, 6.0, 6.0, 21.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with().totalFiles(1),
@@ -277,12 +265,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void 
testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy(
-      String contextName,
-      Map<String, Object> context
-  )
+  @Test
+  public void 
testWindowOnFooWith2WindowsBothWindowsHavingPartitionByInnerGroupBy()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -308,7 +292,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.DOUBLE
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -325,7 +309,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("d0", ColumnType.FLOAT)
                     .add("d1", ColumnType.DOUBLE)
@@ -372,7 +356,7 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0, 5.0, 5.0},
             new Object[]{6.0f, 6.0, 6.0, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with().totalFiles(1),
@@ -391,12 +375,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed(
-      String contextName,
-      Map<String, Object> context
-  )
+  @Test
+  public void testWindowOnFooWith2WindowsBothPartitionByWithOrderReversed()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -421,7 +401,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.DOUBLE
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -438,7 +418,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("d0", ColumnType.FLOAT)
                     .add("d1", ColumnType.DOUBLE)
@@ -485,7 +465,7 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0, 5.0, 5.0},
             new Object[]{6.0f, 6.0, 6.0, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with().totalFiles(1),
@@ -504,9 +484,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithEmptyOverWithGroupBy(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithEmptyOverWithGroupBy()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -525,7 +504,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.FLOAT
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -538,7 +517,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new NaivePartitioningOperatorFactory(ImmutableList.of()),
@@ -567,7 +546,7 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 21.0},
             new Object[]{6.0f, 21.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .setExpectedCountersForStageWorkerChannel(
             CounterSnapshotMatcher
                 .with().totalFiles(1),
@@ -586,9 +565,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithNoGroupByAndPartition(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithNoGroupByAndPartition()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -603,7 +581,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(DruidQuery.CTX_SCAN_SIGNATURE, 
"[{\"name\":\"m1\",\"type\":\"FLOAT\"}]")
                     .build();
 
@@ -617,7 +595,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m1"))),
@@ -647,13 +625,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0},
             new Object[]{6.0f, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithNoGroupByAndPartitionOnTwoElements()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -668,7 +645,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]"
@@ -685,7 +662,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new NaiveSortOperatorFactory(ImmutableList.of(
@@ -718,13 +695,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0},
             new Object[]{6.0f, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithNoGroupByAndPartitionByAnother(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithNoGroupByAndPartitionByAnother()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -739,7 +715,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"m2\",\"type\":\"DOUBLE\"}]"
@@ -756,7 +732,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("m2"))),
@@ -786,13 +762,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0},
             new Object[]{6.0f, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithGroupByAndInnerLimit(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithGroupByAndInnerLimit()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("m1", ColumnType.FLOAT)
@@ -825,10 +800,10 @@ public class MSQWindowTest extends MSQTestBase
                             )
                         ))
                         .setLimit(5)
-                        .setContext(context)
+                        .setContext(DEFAULT_MSQ_CONTEXT)
                         .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("d0", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new NaivePartitioningOperatorFactory(ImmutableList.of()),
@@ -861,17 +836,16 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{4.0f, 15.0},
             new Object[]{5.0f, 15.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithNoGroupByAndPartitionAndVirtualColumns()
   {
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"m1\",\"type\":\"FLOAT\"},{\"name\":\"v0\",\"type\":\"LONG\"}]"
@@ -901,7 +875,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("v0", ColumnType.LONG)
                     .add("m1", ColumnType.FLOAT)
@@ -936,19 +910,18 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{3, 5.0f, 5.0},
             new Object[]{3, 6.0f, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithNoGroupByAndEmptyOver(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithNoGroupByAndEmptyOver()
   {
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(DruidQuery.CTX_SCAN_SIGNATURE, 
"[{\"name\":\"m1\",\"type\":\"FLOAT\"}]")
                     .build();
 
@@ -973,7 +946,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("m1", ColumnType.FLOAT).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new NaivePartitioningOperatorFactory(ImmutableList.of()),
@@ -1002,17 +975,16 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 21.0},
             new Object[]{6.0f, 21.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithPartitionByOrderBYWithJoin(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithPartitionByOrderBYWithJoin()
   {
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@@ -1021,7 +993,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature1 =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]"
@@ -1070,7 +1042,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("m1", ColumnType.FLOAT)
                     .add("w0", ColumnType.DOUBLE)
@@ -1106,17 +1078,16 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0, 5.0},
             new Object[]{6.0f, 6.0, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithEmptyOverWithJoin(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithEmptyOverWithJoin()
   {
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"j0.m2\",\"type\":\"DOUBLE\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@@ -1125,7 +1096,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature1 =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"m2\",\"type\":\"DOUBLE\"},{\"name\":\"v0\",\"type\":\"FLOAT\"}]"
@@ -1174,7 +1145,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("m1", ColumnType.FLOAT)
                     .add("w0", ColumnType.DOUBLE)
@@ -1209,13 +1180,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 21.0, 5.0},
             new Object[]{6.0f, 21.0, 6.0}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithDim2(String contextName, Map<String, Object> 
context)
+  @Test
+  public void testWindowOnFooWithDim2()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("dim2", ColumnType.STRING)
@@ -1230,7 +1200,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"dim2\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@@ -1247,7 +1217,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("dim2", ColumnType.STRING).add("w0", 
ColumnType.DOUBLE).build(),
         ImmutableList.of(
             new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("dim2"))),
@@ -1287,17 +1257,16 @@ public class MSQWindowTest extends MSQTestBase
                 new Object[]{"abc", 5.0},
                 new Object[]{null, 8.0}
             ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithEmptyOverWithUnnest(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithEmptyOverWithUnnest()
   {
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@@ -1333,7 +1302,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("m1", ColumnType.FLOAT)
                     .add("w0", ColumnType.DOUBLE)
@@ -1370,17 +1339,16 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 24.0, NullHandling.sqlCompatible() ? null : ""},
             new Object[]{6.0f, 24.0, NullHandling.sqlCompatible() ? null : ""}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnFooWithPartitionByAndWithUnnest(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testWindowOnFooWithPartitionByAndWithUnnest()
   {
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"j0.unnest\",\"type\":\"STRING\"},{\"name\":\"m1\",\"type\":\"FLOAT\"}]"
@@ -1416,7 +1384,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder()
                     .add("m1", ColumnType.FLOAT)
                     .add("w0", ColumnType.DOUBLE)
@@ -1454,14 +1422,13 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{5.0f, 5.0, NullHandling.sqlCompatible() ? null : ""},
             new Object[]{6.0f, 6.0, NullHandling.sqlCompatible() ? null : ""}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
   // Insert Tests
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testInsertWithWindow(String contextName, Map<String, Object> 
context)
+  @Test
+  public void testInsertWithWindow()
   {
     List<Object[]> expectedRows = ImmutableList.of(
         new Object[]{946684800000L, 1.0f, 1.0},
@@ -1484,7 +1451,7 @@ public class MSQWindowTest extends MSQTestBase
                          + "SUM(m1) OVER(PARTITION BY m1) as summ1\n"
                          + "from foo\n"
                          + "GROUP BY __time, m1 PARTITIONED BY ALL")
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedResultRows(expectedRows)
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
@@ -1492,9 +1459,8 @@ public class MSQWindowTest extends MSQTestBase
 
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testInsertWithWindowEmptyOver(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testInsertWithWindowEmptyOver()
   {
     List<Object[]> expectedRows = ImmutableList.of(
         new Object[]{946684800000L, 1.0f, 21.0},
@@ -1517,7 +1483,7 @@ public class MSQWindowTest extends MSQTestBase
                          + "SUM(m1) OVER() as summ1\n"
                          + "from foo\n"
                          + "GROUP BY __time, m1 PARTITIONED BY ALL")
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedResultRows(expectedRows)
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
@@ -1525,9 +1491,8 @@ public class MSQWindowTest extends MSQTestBase
 
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testInsertWithWindowPartitionByOrderBy(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testInsertWithWindowPartitionByOrderBy()
   {
     List<Object[]> expectedRows = ImmutableList.of(
         new Object[]{946684800000L, 1.0f, 1.0},
@@ -1550,7 +1515,7 @@ public class MSQWindowTest extends MSQTestBase
                          + "SUM(m1) OVER(PARTITION BY m1 ORDER BY m1 ASC) as 
summ1\n"
                          + "from foo\n"
                          + "GROUP BY __time, m1 PARTITIONED BY ALL")
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedResultRows(expectedRows)
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
@@ -1560,9 +1525,8 @@ public class MSQWindowTest extends MSQTestBase
 
 
   // Replace Tests
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testReplaceWithWindowsAndUnnest(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testReplaceWithWindowsAndUnnest()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1576,7 +1540,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY m1")
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1594,9 +1558,8 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSimpleWindowWithPartitionBy(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testSimpleWindowWithPartitionBy()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1609,7 +1572,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY m1")
                      .setExpectedDataSource("foo")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1625,9 +1588,8 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSimpleWindowWithEmptyOver(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testSimpleWindowWithEmptyOver()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1640,7 +1602,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY m1")
                      .setExpectedDataSource("foo")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1656,9 +1618,8 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSimpleWindowWithEmptyOverNoGroupBy(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testSimpleWindowWithEmptyOverNoGroupBy()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1671,7 +1632,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY m1")
                      .setExpectedDataSource("foo")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1687,9 +1648,8 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSimpleWindowWithDuplicateSelectNode(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testSimpleWindowWithDuplicateSelectNode()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1703,7 +1663,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY m1")
                      .setExpectedDataSource("foo")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1719,9 +1679,8 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSimpleWindowWithJoins(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testSimpleWindowWithJoins()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -1735,7 +1694,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY DAY CLUSTERED BY m1")
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -1761,9 +1720,8 @@ public class MSQWindowTest extends MSQTestBase
   }
 
   // Bigger dataset tests
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSelectWithWikipedia(String contextName, Map<String, Object> 
context)
+  @Test
+  public void testSelectWithWikipedia()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("cityName", ColumnType.STRING)
@@ -1779,7 +1737,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> contextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"}]"
@@ -1797,7 +1755,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(contextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("cityName", ColumnType.STRING)
                     .add("added", ColumnType.LONG)
                     .add("w0", ColumnType.LONG).build(),
@@ -1830,17 +1788,16 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{"Albuquerque", 9L, 140L},
             new Object[]{"Albuquerque", 2L, 140L}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSelectWithWikipediaEmptyOverWithCustomContext(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testSelectWithWikipediaEmptyOverWithCustomContext()
   {
     final Map<String, Object> customContext =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     
.put(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW, 200)
                     .build();
 
@@ -1852,9 +1809,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testSelectWithWikipediaWithPartitionKeyNotInSelect(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testSelectWithWikipediaWithPartitionKeyNotInSelect()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("cityName", ColumnType.STRING)
@@ -1870,7 +1826,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> innerContextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"}]"
@@ -1888,7 +1844,7 @@ public class MSQWindowTest extends MSQTestBase
                 .context(innerContextWithRowSignature)
                 .build()),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("cityName", ColumnType.STRING)
                     .add("added", ColumnType.LONG)
                     .add("w0", ColumnType.LONG).build(),
@@ -1902,7 +1858,7 @@ public class MSQWindowTest extends MSQTestBase
 
     final Map<String, Object> outerContextWithRowSignature =
         ImmutableMap.<String, Object>builder()
-                    .putAll(context)
+                    .putAll(DEFAULT_MSQ_CONTEXT)
                     .put(
                         DruidQuery.CTX_SCAN_SIGNATURE,
                         
"[{\"name\":\"added\",\"type\":\"LONG\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"}]"
@@ -1941,13 +1897,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{"Tokyo", 0L, 12615L},
             new Object[]{"Santiago", 161L, 401L}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testGroupByWithWikipedia(String contextName, Map<String, Object> 
context)
+  @Test
+  public void testGroupByWithWikipedia()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("cityName", ColumnType.STRING)
@@ -1972,7 +1927,7 @@ public class MSQWindowTest extends MSQTestBase
                                                    ColumnType.LONG
                                                )
                                            ))
-                                           .setContext(context)
+                                           .setContext(DEFAULT_MSQ_CONTEXT)
                                            .build();
 
 
@@ -1985,7 +1940,7 @@ public class MSQWindowTest extends MSQTestBase
     final WindowOperatorQuery query = new WindowOperatorQuery(
         new QueryDataSource(groupByQuery),
         new LegacySegmentSpec(Intervals.ETERNITY),
-        context,
+        DEFAULT_MSQ_CONTEXT,
         RowSignature.builder().add("d0", ColumnType.STRING)
                     .add("d1", ColumnType.LONG)
                     .add("w0", ColumnType.LONG).build(),
@@ -2019,13 +1974,12 @@ public class MSQWindowTest extends MSQTestBase
             new Object[]{"Albuquerque", 9L, 140L},
             new Object[]{"Albuquerque", 129L, 140L}
         ))
-        .setQueryContext(context)
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testReplaceGroupByOnWikipedia(String contextName, Map<String, 
Object> context)
+  @Test
+  public void testReplaceGroupByOnWikipedia()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -2041,7 +1995,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY ALL CLUSTERED BY added")
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -2055,11 +2009,10 @@ public class MSQWindowTest extends MSQTestBase
                      .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String 
contextName, Map<String, Object> context)
+  @Test
+  public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers()
   {
-    final Map<String, Object> multipleWorkerContext = new HashMap<>(context);
+    final Map<String, Object> multipleWorkerContext = new 
HashMap<>(DEFAULT_MSQ_CONTEXT);
     multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5);
 
     final RowSignature rowSignature = RowSignature.builder()
@@ -2286,9 +2239,8 @@ public class MSQWindowTest extends MSQTestBase
         .verifyResults();
   }
 
-  @MethodSource("data")
-  @ParameterizedTest(name = "{index}:with context {0}")
-  public void testReplaceWithPartitionedByDayOnWikipedia(String contextName, 
Map<String, Object> context)
+  @Test
+  public void testReplaceWithPartitionedByDayOnWikipedia()
   {
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
@@ -2304,7 +2256,7 @@ public class MSQWindowTest extends MSQTestBase
                              + "PARTITIONED BY DAY")
                      .setExpectedDataSource("foo1")
                      .setExpectedRowSignature(rowSignature)
-                     .setQueryContext(context)
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
                      .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
                      .setExpectedResultRows(
                          ImmutableList.of(
@@ -2323,4 +2275,41 @@ public class MSQWindowTest extends MSQTestBase
                      )))
                      .verifyResults();
   }
+
+  @Test
+  public void testFailurePartitionByMVD_1()
+  {
+    testSelectQuery()
+        .setSql("select cityName, countryName, 
array_to_mv(array[1,length(cityName)]), "
+                + "row_number() over (partition by  
array_to_mv(array[1,length(cityName)]) order by countryName, cityName)\n"
+                + "from wikipedia\n"
+                + "where countryName in ('Austria', 'Republic of Korea') and 
cityName is not null\n"
+                + "order by 1, 2, 3")
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
+        .setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
+            CoreMatchers.instanceOf(ISE.class),
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
+                "Encountered a multi value column [v0]. Window processing does 
not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
+        ))
+        .verifyExecutionError();
+  }
+
+  @Test
+  public void testFailurePartitionByMVD_2()
+  {
+    testSelectQuery()
+        .setSql("  select cityName, countryName, 
array_to_mv(array[1,length(cityName)]),"
+                + "row_number() over (partition by countryName order by 
countryName, cityName) as c1,\n"
+                + "row_number() over (partition by  
array_to_mv(array[1,length(cityName)]) order by countryName, cityName) as c2\n"
+                + "from wikipedia\n"
+                + "where countryName in ('Austria', 'Republic of Korea') and 
cityName is not null\n"
+                + "order by 1, 2, 3")
+        .setQueryContext(DEFAULT_MSQ_CONTEXT)
+        .setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
+            CoreMatchers.instanceOf(ISE.class),
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
+                "Encountered a multi value column [v0]. Window processing does 
not support MVDs. Consider using UNNEST or MV_TO_ARRAY."))
+        ))
+        .verifyExecutionError();
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
index e64f086edd7..a5c2528dd1b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java
@@ -20,6 +20,7 @@
 package org.apache.druid.query.rowsandcols;
 
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
 import org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
@@ -28,6 +29,7 @@ import org.apache.druid.segment.column.ColumnType;
 import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -128,7 +130,16 @@ public class RearrangedRowsAndColumns implements 
RowsAndColumns
             @Override
             public Object getObject(int rowNum)
             {
-              return accessor.getObject(pointers[start + rowNum]);
+              Object value = accessor.getObject(pointers[start + rowNum]);
+              if (ColumnType.STRING.equals(getType()) && value instanceof 
List) {
+                // special handling to reject MVDs
+                throw new UOE(
+                    "Encountered a multi value column [%s]. Window processing 
does not support MVDs. "
+                    + "Consider using UNNEST or MV_TO_ARRAY.",
+                    name
+                );
+              }
+              return value;
             }
 
             @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to