This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d851985cf5 MSQ: Add support for indexSpec. (#13275)
d851985cf5 is described below
commit d851985cf5b0faf8e6630c3011dc80288e959f28
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Oct 28 14:27:50 2022 -0700
MSQ: Add support for indexSpec. (#13275)
---
docs/multi-stage-query/reference.md | 3 +-
.../apache/druid/msq/indexing/MSQTuningConfig.java | 28 +++++++--
.../SegmentGeneratorFrameProcessorFactory.java | 2 +-
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 9 +--
.../druid/msq/util/MultiStageQueryContext.java | 67 +++++++++++-----------
.../druid/msq/indexing/MSQTuningConfigTest.java | 23 +++++---
.../druid/msq/util/MultiStageQueryContextTest.java | 57 ++++++++++++++++--
.../java/org/apache/druid/query/QueryContexts.java | 2 +-
.../java/org/apache/druid/segment/IndexSpec.java | 1 +
website/.spelling | 3 +-
10 files changed, 139 insertions(+), 56 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index eb70f143d4..a50e7991dc 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -202,8 +202,7 @@ The following table lists the context parameters for the
MSQ task engine:
| segmentSortOrder | INSERT or REPLACE<br /><br />Normally, Druid sorts rows
in individual segments using `__time` first, followed by the [CLUSTERED
BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in
segments using this column list first, followed by the CLUSTERED BY order.<br
/><br />You provide the column list as comma-separated values or as a JSON
array in string form. If your query includes `__time`, then this list must
begin with `__time`. For example, c [...]
| maxParseExceptions| SELECT, INSERT, REPLACE<br /><br />Maximum number of
parse exceptions that are ignored while executing the query before it stops
with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value
to -1.| 0 |
| rowsPerSegment | INSERT or REPLACE<br /><br />The number of rows per segment
to target. The actual number of rows per segment may be somewhat higher or
lower than this number. In most cases, use the default. For general information
about sizing rows per segment, see [Segment Size
Optimization](../operations/segment-optimization.md). | 3,000,000 |
-| sqlTimeZone | Sets the time zone for this connection, which affects how time
functions and timestamp literals behave. Use a time zone name like
"America/Los_Angeles" or offset like "-08:00".| `druid.sql.planner.sqlTimeZone`
on the Broker (default: UTC)|
-| useApproximateCountDistinct | Whether to use an approximate cardinality
algorithm for `COUNT(DISTINCT foo)`.|
`druid.sql.planner.useApproximateCountDistinct` on the Broker (default: true)|
+| indexSpec | INSERT or REPLACE<br /><br />An
[`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating
segments. May be a JSON string or object. | See
[`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
## Limits
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
index 391bb9d674..6d499f4f0e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.segment.IndexSpec;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -55,20 +56,25 @@ public class MSQTuningConfig
@Nullable
private final Integer rowsPerSegment;
+ @Nullable
+ private final IndexSpec indexSpec;
+
public MSQTuningConfig(
@JsonProperty("maxNumWorkers") @Nullable final Integer maxNumWorkers,
@JsonProperty("maxRowsInMemory") @Nullable final Integer maxRowsInMemory,
- @JsonProperty("rowsPerSegment") @Nullable final Integer rowsPerSegment
+ @JsonProperty("rowsPerSegment") @Nullable final Integer rowsPerSegment,
+ @JsonProperty("indexSpec") @Nullable final IndexSpec indexSpec
)
{
this.maxNumWorkers = maxNumWorkers;
this.maxRowsInMemory = maxRowsInMemory;
this.rowsPerSegment = rowsPerSegment;
+ this.indexSpec = indexSpec;
}
public static MSQTuningConfig defaultConfig()
{
- return new MSQTuningConfig(null, null, null);
+ return new MSQTuningConfig(null, null, null, null);
}
@JsonProperty("maxNumWorkers")
@@ -92,6 +98,13 @@ public class MSQTuningConfig
return rowsPerSegment;
}
+ @JsonProperty("indexSpec")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ IndexSpec getIndexSpecForSerialization()
+ {
+ return indexSpec;
+ }
+
public int getMaxNumWorkers()
{
return maxNumWorkers != null ? maxNumWorkers : DEFAULT_MAX_NUM_TASKS;
@@ -107,6 +120,11 @@ public class MSQTuningConfig
return rowsPerSegment != null ? rowsPerSegment :
PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT;
}
+ public IndexSpec getIndexSpec()
+ {
+ return indexSpec != null ? indexSpec : new IndexSpec();
+ }
+
@Override
public boolean equals(Object o)
{
@@ -119,13 +137,14 @@ public class MSQTuningConfig
MSQTuningConfig that = (MSQTuningConfig) o;
return Objects.equals(maxNumWorkers, that.maxNumWorkers)
&& Objects.equals(maxRowsInMemory, that.maxRowsInMemory)
- && Objects.equals(rowsPerSegment, that.rowsPerSegment);
+ && Objects.equals(rowsPerSegment, that.rowsPerSegment)
+ && Objects.equals(indexSpec, that.indexSpec);
}
@Override
public int hashCode()
{
- return Objects.hash(maxNumWorkers, maxRowsInMemory, rowsPerSegment);
+ return Objects.hash(maxNumWorkers, maxRowsInMemory, rowsPerSegment,
indexSpec);
}
@Override
@@ -135,6 +154,7 @@ public class MSQTuningConfig
"maxNumWorkers=" + maxNumWorkers +
", maxRowsInMemory=" + maxRowsInMemory +
", rowsPerSegment=" + rowsPerSegment +
+ ", indexSpec=" + indexSpec +
'}';
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/SegmentGeneratorFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/SegmentGeneratorFrameProcessorFactory.java
index 6019f4bffd..2796075e5a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/SegmentGeneratorFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/SegmentGeneratorFrameProcessorFactory.java
@@ -291,7 +291,7 @@ public class SegmentGeneratorFrameProcessorFactory
@Override
public IndexSpec getIndexSpec()
{
- return new IndexSpec();
+ return tuningConfig.getIndexSpec();
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 523a75d659..6beb0763bf 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -47,6 +47,7 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
@@ -151,6 +152,8 @@ public class MSQTaskQueryMaker implements QueryMaker
DEFAULT_ROWS_IN_MEMORY
);
+ final IndexSpec indexSpec =
MultiStageQueryContext.getIndexSpec(queryContext, jsonMapper);
+
final boolean finalizeAggregations =
MultiStageQueryContext.isFinalizeAggregations(queryContext);
final List<Interval> replaceTimeChunks =
@@ -215,9 +218,7 @@ public class MSQTaskQueryMaker implements QueryMaker
throw new ISE("Unable to convert %s to a segment granularity",
segmentGranularity);
}
- final List<String> segmentSortOrder =
MultiStageQueryContext.decodeSortOrder(
- MultiStageQueryContext.getSortOrder(queryContext)
- );
+ final List<String> segmentSortOrder =
MultiStageQueryContext.getSortOrder(queryContext);
validateSegmentSortOrder(
segmentSortOrder,
@@ -249,7 +250,7 @@ public class MSQTaskQueryMaker implements QueryMaker
.columnMappings(new ColumnMappings(columnMappings))
.destination(destination)
.assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(queryContext))
- .tuningConfig(new MSQTuningConfig(maxNumWorkers,
maxRowsInMemory, rowsPerSegment))
+ .tuningConfig(new MSQTuningConfig(maxNumWorkers,
maxRowsInMemory, rowsPerSegment, indexSpec))
.build();
final MSQControllerTask controllerTask = new MSQControllerTask(
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 97eb5d2a19..c7c61fd30c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -25,18 +25,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.msq.sql.MSQMode;
import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.segment.IndexSpec;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -73,7 +72,8 @@ public class MultiStageQueryContext
* CLUSTERED BY clause) but it can be overridden.
*/
public static final String CTX_SORT_ORDER = "segmentSortOrder";
- private static final String DEFAULT_SORT_ORDER = null;
+
+ public static final String CTX_INDEX_SPEC = "indexSpec";
private static final Pattern LOOKS_LIKE_JSON_ARRAY =
Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
@@ -143,41 +143,23 @@ public class MultiStageQueryContext
);
}
- @Nullable
- public static Object getValueFromPropertyMap(
- Map<String, Object> propertyMap,
- String key,
- @Nullable List<String> aliases,
- @Nullable Object defaultValue
- )
+ public static List<String> getSortOrder(final QueryContext queryContext)
{
- if (propertyMap.get(key) != null) {
- return propertyMap.get(key);
- }
-
- if (aliases != null) {
- for (String legacyKey : aliases) {
- if (propertyMap.get(legacyKey) != null) {
- return propertyMap.get(legacyKey);
- }
- }
- }
-
- return defaultValue;
+ return
MultiStageQueryContext.decodeSortOrder(queryContext.getString(CTX_SORT_ORDER));
}
- public static String getSortOrder(final QueryContext queryContext)
+ @Nullable
+ public static IndexSpec getIndexSpec(final QueryContext queryContext, final
ObjectMapper objectMapper)
{
- return queryContext.getString(
- CTX_SORT_ORDER,
- DEFAULT_SORT_ORDER
- );
+ return decodeIndexSpec(queryContext.get(CTX_INDEX_SPEC), objectMapper);
}
/**
* Decodes {@link #CTX_SORT_ORDER} from either a JSON or CSV string.
*/
- public static List<String> decodeSortOrder(@Nullable final String
sortOrderString)
+ @Nullable
+ @VisibleForTesting
+ static List<String> decodeSortOrder(@Nullable final String sortOrderString)
{
if (sortOrderString == null) {
return Collections.emptyList();
@@ -188,7 +170,7 @@ public class MultiStageQueryContext
return new ObjectMapper().readValue(sortOrderString, new
TypeReference<List<String>>() {});
}
catch (JsonProcessingException e) {
- throw new IAE("Invalid JSON provided for [%s]", CTX_SORT_ORDER);
+ throw QueryContexts.badValueException(CTX_SORT_ORDER, "CSV or JSON
array", sortOrderString);
}
} else {
final RFC4180Parser csvParser = new
RFC4180ParserBuilder().withSeparator(',').build();
@@ -200,8 +182,29 @@ public class MultiStageQueryContext
.collect(Collectors.toList());
}
catch (IOException e) {
- throw new IAE("Invalid CSV provided for [%s]", CTX_SORT_ORDER);
+ throw QueryContexts.badValueException(CTX_SORT_ORDER, "CSV or JSON
array", sortOrderString);
}
}
}
+
+ /**
+ * Decodes {@link #CTX_INDEX_SPEC} from either a JSON-encoded string, or
POJOs.
+ */
+ @Nullable
+ @VisibleForTesting
+ static IndexSpec decodeIndexSpec(@Nullable final Object indexSpecObject,
final ObjectMapper objectMapper)
+ {
+ try {
+ if (indexSpecObject == null) {
+ return null;
+ } else if (indexSpecObject instanceof String) {
+ return objectMapper.readValue((String) indexSpecObject,
IndexSpec.class);
+ } else {
+ return objectMapper.convertValue(indexSpecObject, IndexSpec.class);
+ }
+ }
+ catch (Exception e) {
+ throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec",
indexSpecObject);
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java
index 82ed89306e..258ee25e5d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java
@@ -21,7 +21,9 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.StringEncodingStrategy;
import org.junit.Assert;
import org.junit.Test;
@@ -45,15 +47,22 @@ public class MSQTuningConfigTest
public void testSerdeNonDefault() throws Exception
{
final ObjectMapper mapper = TestHelper.makeJsonMapper();
- final MSQTuningConfig config = new MSQTuningConfig(2, 3, 4);
-
- Assert.assertEquals(
- config,
- mapper.readValue(
- mapper.writeValueAsString(config),
- MSQTuningConfig.class
+ final MSQTuningConfig config = new MSQTuningConfig(
+ 2,
+ 3,
+ 4,
+ new IndexSpec(
+ null,
+ null,
+ new StringEncodingStrategy.FrontCoded(null),
+ null,
+ null,
+ null,
+ null
)
);
+
+ Assert.assertEquals(config,
mapper.readValue(mapper.writeValueAsString(config), MSQTuningConfig.class));
}
@Test
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 8f0876f1b1..0153cbc38e 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -19,15 +19,23 @@
package org.apache.druid.msq.util;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.query.BadQueryContextException;
import org.apache.druid.query.QueryContext;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.column.StringEncodingStrategy;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -73,7 +81,10 @@ public class MultiStageQueryContextTest
@Test
public void getAssignmentStrategy_noParameterSetReturnsDefaultValue()
{
- Assert.assertEquals(WorkerAssignmentStrategy.MAX,
MultiStageQueryContext.getAssignmentStrategy(QueryContext.empty()));
+ Assert.assertEquals(
+ WorkerAssignmentStrategy.MAX,
+ MultiStageQueryContext.getAssignmentStrategy(QueryContext.empty())
+ );
}
@Test
@@ -156,20 +167,53 @@ public class MultiStageQueryContextTest
Assert.assertEquals(ImmutableList.of(), decodeSortOrder(""));
Assert.assertEquals(ImmutableList.of(), decodeSortOrder(null));
- Assert.assertThrows(IllegalArgumentException.class, () ->
decodeSortOrder("[["));
+ Assert.assertThrows(BadQueryContextException.class, () ->
decodeSortOrder("[["));
+ }
+
+ @Test
+ public void testGetIndexSpec()
+ {
+ Assert.assertNull(decodeIndexSpec(null));
+
+ Assert.assertEquals(new IndexSpec(), decodeIndexSpec("{}"));
+ Assert.assertEquals(new IndexSpec(),
decodeIndexSpec(Collections.emptyMap()));
+
+ Assert.assertEquals(
+ new IndexSpec(null, null, new StringEncodingStrategy.FrontCoded(null),
null, null, null, null),
+
decodeIndexSpec("{\"stringDictionaryEncoding\":{\"type\":\"frontCoded\"}}")
+ );
+
+ Assert.assertEquals(
+ new IndexSpec(null, null, new StringEncodingStrategy.FrontCoded(null),
null, null, null, null),
+ decodeIndexSpec(ImmutableMap.of("stringDictionaryEncoding",
ImmutableMap.of("type", "frontCoded")))
+ );
+
+ final BadQueryContextException e = Assert.assertThrows(
+ BadQueryContextException.class,
+ () -> decodeIndexSpec("{")
+ );
+
+ MatcherAssert.assertThat(
+ e,
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "Expected key [indexSpec] to be an indexSpec, but got [{]"))
+ );
}
@Test
public void getSortOrderNoParameterSetReturnsDefaultValue()
{
-
Assert.assertNull(MultiStageQueryContext.getSortOrder(QueryContext.empty()));
+ Assert.assertEquals(Collections.emptyList(),
MultiStageQueryContext.getSortOrder(QueryContext.empty()));
}
@Test
public void getSortOrderParameterSetReturnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_SORT_ORDER, "a,
b,\"c,d\"");
- Assert.assertEquals("a, b,\"c,d\"",
MultiStageQueryContext.getSortOrder(QueryContext.of(propertyMap)));
+ Assert.assertEquals(
+ ImmutableList.of("a", "b", "c,d"),
+ MultiStageQueryContext.getSortOrder(QueryContext.of(propertyMap))
+ );
}
@Test
@@ -189,4 +233,9 @@ public class MultiStageQueryContextTest
{
return MultiStageQueryContext.decodeSortOrder(input);
}
+
+ private static IndexSpec decodeIndexSpec(@Nullable final Object
inputSpecObject)
+ {
+ return MultiStageQueryContext.decodeIndexSpec(inputSpecObject, new
ObjectMapper());
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index c06c036244..f3106d9a60 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -457,7 +457,7 @@ public class QueryContexts
{
return new BadQueryContextException(
StringUtils.format(
- "Expected key [%s] to be in %s, but got [%s]",
+ "Expected key [%s] to be %s, but got [%s]",
key,
expected,
actual
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
index 4358444126..d216659e54 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
@@ -171,6 +171,7 @@ public class IndexSpec
}
@JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public SegmentizerFactory getSegmentLoader()
{
diff --git a/website/.spelling b/website/.spelling
index 5f9e60ff58..a99b50868f 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -694,7 +694,8 @@ WorkerRpcFailed
# MSQ context parameters
maxNumTasks
taskAssignment
-finalizeAggregations
+finalizeAggregations
+indexSpec
rowsInMemory
segmentSortOrder
rowsPerSegment
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]