This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9177419628 Unnest functionality for Druid (#13268)
9177419628 is described below
commit 91774196285ea51b823883394ef1bfe8f2417892
Author: somu-imply <[email protected]>
AuthorDate: Fri Dec 2 18:48:25 2022 -0800
Unnest functionality for Druid (#13268)
* Moving all unnest cursor code atop refactored code for unnest
* Updating unnest cursor
* Removing dedup and fixing up some null checks
* AllowList changes
* Fixing some NPEs
* Using bitset for allowlist
* Updating the initialization only when cursor is in non-done state
* Updating code to skip rows not in allow list
* Adding a flag for cases when first element is not in allowed list
* Updating for a null in allowList
* Splitting unnest cursor into 2 subclasses
* Intercepting some apis with columnName for new unnested column
* Adding test cases and renaming some stuff
* checkstyle fixes
* Moving to an interface for Unnest
* handling null rows in a dimension
* Updating cursors after comments part-1
* Addressing comments and adding some more tests
* Reverting a change to ScanQueryRunner and improving a comment
* removing an unused function
* Updating cursors after comments part 2
* One last fix for review comments
* Making some functions private, deleting some comments, adding a test for
unnest of unnest with allowList
* Adding an exception for a case
* Closure for unnest data source
* Adding some javadocs
* One minor change in makeDimSelector of columnarCursor
* Updating an error message
* Update
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java
Co-authored-by: Abhishek Agarwal
<[email protected]>
* Unnesting on virtual columns was missing an object array, adding that to
support virtual columns unnesting
* Updating exceptions to use UOE
* Renamed files, added column capability test on adapter, return statement
and made unnest datasource not cacheable for the time being
* Handling for null values in dim selector
* Fixing a NPE for null row
* Updating capabilities
* Updating capabilities
Co-authored-by: Abhishek Agarwal
<[email protected]>
---
.../java/org/apache/druid/query/DataSource.java | 3 +-
.../org/apache/druid/query/UnnestDataSource.java | 212 +++++++
.../druid/query/planning/DataSourceAnalysis.java | 39 +-
.../segment/UnnestColumnValueSelectorCursor.java | 336 +++++++++++
.../druid/segment/UnnestDimensionCursor.java | 415 ++++++++++++++
.../druid/segment/UnnestSegmentReference.java | 115 ++++
.../apache/druid/segment/UnnestStorageAdapter.java | 234 ++++++++
.../java/org/apache/druid/segment/ListCursor.java | 228 ++++++++
.../UnnestColumnValueSelectorCursorTest.java | 632 +++++++++++++++++++++
.../druid/segment/UnnestStorageAdapterTest.java | 399 +++++++++++++
10 files changed, 2602 insertions(+), 11 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index f56a3550a3..43dfb3be85 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -41,7 +41,8 @@ import java.util.function.Function;
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
- @JsonSubTypes.Type(value = GlobalTableDataSource.class, name =
"globalTable")
+ @JsonSubTypes.Type(value = GlobalTableDataSource.class, name =
"globalTable"),
+ @JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest")
})
public interface DataSource
{
diff --git
a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
new file mode 100644
index 0000000000..4623701674
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.UnnestSegmentReference;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+/**
+ * The data source for representing an unnest operation.
+ *
+ * An unnest data source has the following:
+ * a base data source which is to be unnested
+ * the column name of the MVD which will be unnested
+ * the name of the column that will hold the unnested values
+ * and an allowlist serving as a filter of which values in the MVD will be
unnested.
+ */
+public class UnnestDataSource implements DataSource
+{
+ private final DataSource base;
+ private final String column;
+ private final String outputName;
+ private final LinkedHashSet<String> allowList;
+
+ private UnnestDataSource(
+ DataSource dataSource,
+ String columnName,
+ String outputName,
+ LinkedHashSet<String> allowList
+ )
+ {
+ this.base = dataSource;
+ this.column = columnName;
+ this.outputName = outputName;
+ this.allowList = allowList;
+ }
+
+ @JsonCreator
+ public static UnnestDataSource create(
+ @JsonProperty("base") DataSource base,
+ @JsonProperty("column") String columnName,
+ @JsonProperty("outputName") String outputName,
+ @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
+ )
+ {
+ return new UnnestDataSource(base, columnName, outputName, allowList);
+ }
+
+ @JsonProperty("base")
+ public DataSource getBase()
+ {
+ return base;
+ }
+
+ @JsonProperty("column")
+ public String getColumn()
+ {
+ return column;
+ }
+
+ @JsonProperty("outputName")
+ public String getOutputName()
+ {
+ return outputName;
+ }
+
+ @JsonProperty("allowList")
+ public LinkedHashSet<String> getAllowList()
+ {
+ return allowList;
+ }
+
+ @Override
+ public Set<String> getTableNames()
+ {
+ return base.getTableNames();
+ }
+
+ @Override
+ public List<DataSource> getChildren()
+ {
+ return ImmutableList.of(base);
+ }
+
+ @Override
+ public DataSource withChildren(List<DataSource> children)
+ {
+ if (children.size() != 1) {
+ throw new IAE("Expected [1] child, got [%d]", children.size());
+ }
+ return new UnnestDataSource(children.get(0), column, outputName,
allowList);
+ }
+
+ @Override
+ public boolean isCacheable(boolean isBroker)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isGlobal()
+ {
+ return base.isGlobal();
+ }
+
+ @Override
+ public boolean isConcrete()
+ {
+ return base.isConcrete();
+ }
+
+ @Override
+ public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+ Query query,
+ AtomicLong cpuTimeAccumulator
+ )
+ {
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
base.createSegmentMapFunction(
+ query,
+ cpuTimeAccumulator
+ );
+ return JvmUtils.safeAccumulateThreadCpuTime(
+ cpuTimeAccumulator,
+ () -> {
+ if (column == null) {
+ return segmentMapFn;
+ } else if (column.isEmpty()) {
+ return segmentMapFn;
+ } else {
+ return
+ baseSegment ->
+ new UnnestSegmentReference(
+ segmentMapFn.apply(baseSegment),
+ column,
+ outputName,
+ allowList
+ );
+ }
+ }
+ );
+
+ }
+
+ @Override
+ public DataSource withUpdatedDataSource(DataSource newSource)
+ {
+ return new UnnestDataSource(newSource, column, outputName, allowList);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ // The column being unnested would need to be part of the cache key
+ // as the results are dependent on what column is being unnested.
+ // Currently, it is not cacheable.
+ // Future development should use the table name and column came to
+ // create an appropriate cac
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UnnestDataSource that = (UnnestDataSource) o;
+ return column.equals(that.column)
+ && outputName.equals(that.outputName)
+ && base.equals(that.base);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(base, column, outputName);
+ }
+}
+
+
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index c329e3a570..63c2c8b815 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
@@ -112,17 +113,29 @@ public class DataSourceAnalysis
Query<?> baseQuery = null;
DataSource current = dataSource;
- while (current instanceof QueryDataSource) {
- final Query<?> subQuery = ((QueryDataSource) current).getQuery();
+ // This needs to be an or condition between QueryDataSource and
UnnestDataSource
+ // As queries can have interleaving query and unnest data sources.
+ // Ideally if each data source generate their own analysis object we can
avoid the or here
+ // and have cleaner code. Especially as we increase the types of data
sources in future
+ // these or checks will be tedious. Future development should move
forDataSource method
+ // into each data source.
- if (!(subQuery instanceof BaseQuery)) {
- // We must verify that the subQuery is a BaseQuery, because it is
required to make "getBaseQuerySegmentSpec"
- // work properly. All built-in query types are BaseQuery, so we only
expect this with funky extension queries.
- throw new IAE("Cannot analyze subquery of class[%s]",
subQuery.getClass().getName());
- }
+ while (current instanceof QueryDataSource || current instanceof
UnnestDataSource) {
+ if (current instanceof QueryDataSource) {
+ final Query<?> subQuery = ((QueryDataSource) current).getQuery();
+
+ if (!(subQuery instanceof BaseQuery)) {
+ // We must verify that the subQuery is a BaseQuery, because it is
required to make "getBaseQuerySegmentSpec"
+ // work properly. All built-in query types are BaseQuery, so we only
expect this with funky extension queries.
+ throw new IAE("Cannot analyze subquery of class[%s]",
subQuery.getClass().getName());
+ }
- baseQuery = subQuery;
- current = subQuery.getDataSource();
+ baseQuery = subQuery;
+ current = subQuery.getDataSource();
+ } else {
+ final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
+ current = unnestDataSource.getBase();
+ }
}
if (current instanceof JoinDataSource) {
@@ -276,7 +289,8 @@ public class DataSourceAnalysis
/**
* Returns true if this datasource is concrete-based (see {@link
#isConcreteBased()}, and the base datasource is a
- * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of
{@link TableDataSource}. This is an
+ * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of
{@link TableDataSource}
+ * or an {@link UnnestDataSource} composed entirely of {@link
TableDataSource} . This is an
* important property, because it corresponds to datasources that can be
handled by Druid's distributed query stack.
*/
public boolean isConcreteTableBased()
@@ -286,6 +300,10 @@ public class DataSourceAnalysis
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource
&&
+ baseDataSource.getChildren()
+ .stream()
+ .allMatch(ds -> ds
instanceof TableDataSource))
+ || (baseDataSource instanceof
UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds
instanceof TableDataSource)));
@@ -298,6 +316,7 @@ public class DataSourceAnalysis
{
return dataSource instanceof QueryDataSource;
}
+
/**
* Returns true if this datasource is made out of a join operation
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
new file mode 100644
index 0000000000..db4acb893e
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+/**
+ * The cursor to help unnest MVDs without dictionary encoding and ARRAY type
selectors.
+ * <p>
+ * Consider a segment has 2 rows
+ * ['a', 'b', 'c']
+ * ['d', 'e']
+ * <p>
+ * The baseCursor points to the row ['a', 'b', 'c']
+ * while the unnestCursor with each call of advance() moves over individual
elements.
+ * <p>
+ * unnestCursor.advance() -> 'a'
+ * unnestCursor.advance() -> 'b'
+ * unnestCursor.advance() -> 'c'
+ * unnestCursor.advance() -> 'd' (advances base cursor first)
+ * unnestCursor.advance() -> 'e'
+ * <p>
+ * <p>
+ * The allowSet if available helps skip over elements which are not in the
allowList by moving the cursor to
+ * the next available match.
+ * <p>
+ * The index reference points to the index of each row that the unnest cursor
is accessing through currentVal
+ * The index ranges from 0 to the size of the list in each row which is held
in the unnestListForCurrentRow
+ * <p>
+ * The needInitialization flag sets up the initial values of
unnestListForCurrentRow at the beginning of the segment
+ */
+public class UnnestColumnValueSelectorCursor implements Cursor
+{
+ private final Cursor baseCursor;
+ private final ColumnSelectorFactory baseColumnSelectorFactory;
+ private final ColumnValueSelector columnValueSelector;
+ private final String columnName;
+ private final String outputName;
+ private final LinkedHashSet<String> allowSet;
+ private int index;
+ private Object currentVal;
+ private List<Object> unnestListForCurrentRow;
+ private boolean needInitialization;
+
+ public UnnestColumnValueSelectorCursor(
+ Cursor cursor,
+ ColumnSelectorFactory baseColumSelectorFactory,
+ String columnName,
+ String outputColumnName,
+ LinkedHashSet<String> allowSet
+ )
+ {
+ this.baseCursor = cursor;
+ this.baseColumnSelectorFactory = baseColumSelectorFactory;
+ this.columnValueSelector =
this.baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+ this.columnName = columnName;
+ this.index = 0;
+ this.outputName = outputColumnName;
+ this.needInitialization = true;
+ this.allowSet = allowSet;
+ }
+
+ @Override
+ public ColumnSelectorFactory getColumnSelectorFactory()
+ {
+ return new ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec
dimensionSpec)
+ {
+ if (!outputName.equals(dimensionSpec.getDimension())) {
+ return
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+ }
+ throw new UOE("Unsupported dimension selector while using column value
selector for column [%s]", outputName);
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ if (!outputName.equals(columnName)) {
+ return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+ }
+ return new ColumnValueSelector()
+ {
+ @Override
+ public double getDouble()
+ {
+ Object value = getObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+ throw new UOE("Cannot convert object to double");
+ }
+
+ @Override
+ public float getFloat()
+ {
+ Object value = getObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).floatValue();
+ }
+ throw new UOE("Cannot convert object to float");
+ }
+
+ @Override
+ public long getLong()
+ {
+ Object value = getObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ throw new UOE("Cannot convert object to long");
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ columnValueSelector.inspectRuntimeShape(inspector);
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return getObject() == null;
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ if (!unnestListForCurrentRow.isEmpty()) {
+ if (allowSet == null || allowSet.isEmpty()) {
+ return unnestListForCurrentRow.get(index);
+ } else if (allowSet.contains((String)
unnestListForCurrentRow.get(index))) {
+ return unnestListForCurrentRow.get(index);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Class<?> classOfObject()
+ {
+ return Object.class;
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ if (!outputName.equals(column)) {
+ return baseColumnSelectorFactory.getColumnCapabilities(column);
+ }
+ final ColumnCapabilities capabilities =
baseColumnSelectorFactory.getColumnCapabilities(columnName);
+ if (capabilities.isArray()) {
+ return
ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
+ }
+ if (capabilities.hasMultipleValues().isTrue()) {
+ return
ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
+ }
+ return baseColumnSelectorFactory.getColumnCapabilities(columnName);
+ }
+ };
+ }
+
+ @Override
+ public DateTime getTime()
+ {
+ return baseCursor.getTime();
+ }
+
+ @Override
+ public void advance()
+ {
+ advanceUninterruptibly();
+ BaseQuery.checkInterrupted();
+ }
+
+ @Override
+ public void advanceUninterruptibly()
+ {
+ do {
+ advanceAndUpdate();
+ } while (matchAndProceed());
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ if (needInitialization && !baseCursor.isDone()) {
+ initialize();
+ }
+ return baseCursor.isDone();
+ }
+
+ @Override
+ public boolean isDoneOrInterrupted()
+ {
+ if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
+ initialize();
+ }
+ return baseCursor.isDoneOrInterrupted();
+ }
+
+ @Override
+ public void reset()
+ {
+ index = 0;
+ needInitialization = true;
+ baseCursor.reset();
+ }
+
+ /**
+ * This method populates the objects when the base cursor moves to the next
row
+ *
+ * @param firstRun flag to populate one time object references to hold
values for unnest cursor
+ */
+ private void getNextRow(boolean firstRun)
+ {
+ currentVal = this.columnValueSelector.getObject();
+ if (currentVal == null) {
+ if (!firstRun) {
+ unnestListForCurrentRow = new ArrayList<>();
+ }
+ unnestListForCurrentRow.add(null);
+ } else {
+ if (currentVal instanceof List) {
+ unnestListForCurrentRow = (List<Object>) currentVal;
+ } else if (currentVal instanceof Object[]) {
+ unnestListForCurrentRow = Arrays.asList((Object[]) currentVal);
+ } else if (currentVal.getClass().equals(String.class)) {
+ if (!firstRun) {
+ unnestListForCurrentRow = new ArrayList<>();
+ }
+ unnestListForCurrentRow.add(currentVal);
+ }
+ }
+ }
+
+ /**
+ * This initializes the unnest cursor and creates data structures
+ * to start iterating over the values to be unnested.
+ * This would also create a bitset for dictonary encoded columns to
+ * check for matching values specified in allowedList of UnnestDataSource.
+ */
+ private void initialize()
+ {
+ this.unnestListForCurrentRow = new ArrayList<>();
+ getNextRow(needInitialization);
+ if (allowSet != null) {
+ if (!allowSet.isEmpty()) {
+ if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) {
+ advance();
+ }
+ }
+ }
+ needInitialization = false;
+ }
+
+ /**
+ * This advances the cursor to move to the next element to be unnested.
+ * When the last element in a row is unnested, it is also responsible
+ * to move the base cursor to the next row for unnesting and repopulates
+ * the data structures, created during initialize(), to point to the new row
+ */
+ private void advanceAndUpdate()
+ {
+ if (unnestListForCurrentRow.isEmpty() || index >=
unnestListForCurrentRow.size() - 1) {
+ index = 0;
+ baseCursor.advance();
+ if (!baseCursor.isDone()) {
+ getNextRow(needInitialization);
+ }
+ } else {
+ index++;
+ }
+ }
+
+ /**
+ * This advances the unnest cursor in cases where an allowList is specified
+ * and the current value at the unnest cursor is not in the allowList.
+ * The cursor in such cases is moved till the next match is found.
+ *
+ * @return a boolean to indicate whether to stay or move cursor
+ */
+ private boolean matchAndProceed()
+ {
+ boolean matchStatus;
+ if (allowSet == null || allowSet.isEmpty()) {
+ matchStatus = true;
+ } else {
+ matchStatus = allowSet.contains((String)
unnestListForCurrentRow.get(index));
+ }
+ return !baseCursor.isDone() && !matchStatus;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
new file mode 100644
index 0000000000..46a2c626ca
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+/**
+ * The cursor to help unnest MVDs with dictionary encoding.
+ * Consider a segment has 2 rows
+ * ['a', 'b', 'c']
+ * ['d', 'c']
+ * <p>
+ * Considering dictionary encoding, these are represented as
+ * <p>
+ * 'a' -> 0
+ * 'b' -> 1
+ * 'c' -> 2
+ * 'd' -> 3
+ * <p>
+ * The baseCursor points to the row of IndexedInts [0, 1, 2]
+ * while the unnestCursor with each call of advance() moves over individual
elements.
+ * <p>
+ * advance() -> 0 -> 'a'
+ * advance() -> 1 -> 'b'
+ * advance() -> 2 -> 'c'
+ * advance() -> 3 -> 'd' (advances base cursor first)
+ * advance() -> 2 -> 'c'
+ * <p>
+ * Total 5 advance calls above
+ * <p>
+ * The allowSet, if available, helps skip over elements that are not in the
allowList by moving the cursor to
+ * the next available match. The hashSet is converted into a bitset (during
initialization) for efficiency.
+ * If allowSet is ['c', 'd'] then the advance moves over to the next available
match
+ * <p>
+ * advance() -> 2 -> 'c'
+ * advance() -> 3 -> 'd' (advances base cursor first)
+ * advance() -> 2 -> 'c'
+ * <p>
+ * Total 3 advance calls in this case
+ * <p>
+ * The index reference points to the index of each row that the unnest cursor
is accessing
+ * The indexedInts for each row are held in the indexedIntsForCurrentRow object
+ * <p>
+ * The needInitialization flag sets up the initial values of
indexedIntsForCurrentRow at the beginning of the segment
+ */
+public class UnnestDimensionCursor implements Cursor
+{
+ private final Cursor baseCursor;
+ private final DimensionSelector dimSelector;
+ private final String columnName;
+ private final String outputName;
+ private final LinkedHashSet<String> allowSet;
+ private final BitSet allowedBitSet;
+ private final ColumnSelectorFactory baseColumnSelectorFactory;
+ private int index;
+ private IndexedInts indexedIntsForCurrentRow;
+ private boolean needInitialization;
+ private SingleIndexInts indexIntsForRow;
+
+ public UnnestDimensionCursor(
+ Cursor cursor,
+ ColumnSelectorFactory baseColumnSelectorFactory,
+ String columnName,
+ String outputColumnName,
+ LinkedHashSet<String> allowSet
+ )
+ {
+ this.baseCursor = cursor;
+ this.baseColumnSelectorFactory = baseColumnSelectorFactory;
+ this.dimSelector =
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+ this.columnName = columnName;
+ this.index = 0;
+ this.outputName = outputColumnName;
+ this.needInitialization = true;
+ this.allowSet = allowSet;
+ this.allowedBitSet = new BitSet();
+ }
+
+ @Override
+ public ColumnSelectorFactory getColumnSelectorFactory()
+ {
+ return new ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec
dimensionSpec)
+ {
+ if (!outputName.equals(dimensionSpec.getDimension())) {
+ return
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+ }
+
+ return new DimensionSelector()
+ {
+ @Override
+ public IndexedInts getRow()
+ {
+ // This object reference has been created
+ // during the call to initialize and referenced henceforth
+ return indexIntsForRow;
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(@Nullable String value)
+ {
+ final int idForLookup = idLookup().lookupId(value);
+ if (idForLookup < 0) {
+ return new ValueMatcher()
+ {
+ @Override
+ public boolean matches()
+ {
+ return false;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector
inspector)
+ {
+
+ }
+ };
+ }
+
+ return new ValueMatcher()
+ {
+ @Override
+ public boolean matches()
+ {
+ return idForLookup == indexedIntsForCurrentRow.get(index);
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ dimSelector.inspectRuntimeShape(inspector);
+ }
+ };
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+ {
+ return DimensionSelectorUtils.makeValueMatcherGeneric(this,
predicate);
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ dimSelector.inspectRuntimeShape(inspector);
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ if (indexedIntsForCurrentRow == null) {
+ return null;
+ }
+ if (allowedBitSet.isEmpty()) {
+ if (allowSet == null || allowSet.isEmpty()) {
+ return lookupName(indexedIntsForCurrentRow.get(index));
+ }
+ } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index)))
{
+ return lookupName(indexedIntsForCurrentRow.get(index));
+ }
+ return null;
+ }
+
+ @Override
+ public Class<?> classOfObject()
+ {
+ return Object.class;
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ if (!allowedBitSet.isEmpty()) {
+ return allowedBitSet.cardinality();
+ }
+ return dimSelector.getValueCardinality();
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ return dimSelector.lookupName(id);
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ return dimSelector.nameLookupPossibleInAdvance();
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ return dimSelector.idLookup();
+ }
+ };
+ }
+
+ /*
+ This ideally should not be called. If called delegate using the
makeDimensionSelector
+ */
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ if (!outputName.equals(columnName)) {
+ return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
+ }
+ return makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ if (!outputName.equals(column)) {
+ return baseColumnSelectorFactory.getColumnCapabilities(column);
+ }
+ // This currently returns the same type as of the column to be unnested
+ // This is fine for STRING types
+ // But going forward if the dimension to be unnested is of type ARRAY,
+ // this should strip down to the base type of the array
+ final ColumnCapabilities capabilities =
baseColumnSelectorFactory.getColumnCapabilities(columnName);
+ if (capabilities.isArray()) {
+ return
ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
+ }
+ if (capabilities.hasMultipleValues().isTrue()) {
+ return
ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
+ }
+ return baseColumnSelectorFactory.getColumnCapabilities(columnName);
+ }
+ };
+ }
+
+ @Override
+ public DateTime getTime()
+ {
+ return baseCursor.getTime();
+ }
+
+ @Override
+ public void advance()
+ {
+ advanceUninterruptibly();
+ BaseQuery.checkInterrupted();
+ }
+
+ @Override
+ public void advanceUninterruptibly()
+ {
+ do {
+ advanceAndUpdate();
+ } while (matchAndProceed());
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ if (needInitialization && !baseCursor.isDone()) {
+ initialize();
+ }
+ return baseCursor.isDone();
+ }
+
+ @Override
+ public boolean isDoneOrInterrupted()
+ {
+ if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
+ initialize();
+ }
+ return baseCursor.isDoneOrInterrupted();
+ }
+
+ @Override
+ public void reset()
+ {
+ index = 0;
+ needInitialization = true;
+ baseCursor.reset();
+ }
+
+ /**
+ * This initializes the unnest cursor and creates data structures
+ * to start iterating over the values to be unnested.
+ * This would also create a bitset for dictonary encoded columns to
+ * check for matching values specified in allowedList of UnnestDataSource.
+ */
+ private void initialize()
+ {
+ IdLookup idLookup = dimSelector.idLookup();
+ this.indexIntsForRow = new SingleIndexInts();
+ if (allowSet != null && !allowSet.isEmpty() && idLookup != null) {
+ for (String s : allowSet) {
+ if (idLookup.lookupId(s) >= 0) {
+ allowedBitSet.set(idLookup.lookupId(s));
+ }
+ }
+ }
+ if (dimSelector.getObject() != null) {
+ this.indexedIntsForCurrentRow = dimSelector.getRow();
+ }
+ if (!allowedBitSet.isEmpty()) {
+ if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) {
+ advance();
+ }
+ }
+ needInitialization = false;
+ }
+
+ /**
+ * This advances the cursor to move to the next element to be unnested.
+ * When the last element in a row is unnested, it is also responsible
+ * to move the base cursor to the next row for unnesting and repopulates
+ * the data structures, created during initialize(), to point to the new row
+ */
+ private void advanceAndUpdate()
+ {
+ if (indexedIntsForCurrentRow == null) {
+ index = 0;
+ if (!baseCursor.isDone()) {
+ baseCursor.advanceUninterruptibly();
+ }
+ } else {
+ if (index >= indexedIntsForCurrentRow.size() - 1) {
+ if (!baseCursor.isDone()) {
+ baseCursor.advanceUninterruptibly();
+ }
+ if (!baseCursor.isDone()) {
+ indexedIntsForCurrentRow = dimSelector.getRow();
+ }
+ index = 0;
+ } else {
+ ++index;
+ }
+ }
+ }
+
+ /**
+ * This advances the unnest cursor in cases where an allowList is specified
+ * and the current value at the unnest cursor is not in the allowList.
+ * The cursor in such cases is moved till the next match is found.
+ *
+ * @return a boolean to indicate whether to stay or move cursor
+ */
+ private boolean matchAndProceed()
+ {
+ boolean matchStatus;
+ if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) {
+ matchStatus = true;
+ } else {
+ matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index));
+ }
+ return !baseCursor.isDone() && !matchStatus;
+ }
+
+ // Helper class to help in returning
+ // getRow from the dimensionSelector
+ // This is set in the initialize method
+ private class SingleIndexInts implements IndexedInts
+ {
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ //nothing to inspect
+ }
+
+ @Override
+ public int size()
+ {
+ // After unnest each row will have a single element
+ return 1;
+ }
+
+ @Override
+ public int get(int idx)
+ {
+ return indexedIntsForCurrentRow.get(index);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
new file mode 100644
index 0000000000..9da6b8132c
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Optional;
+
+/**
+ * The segment reference for the Unnest Data Source.
+ * The input column name, output name and the allowSet follow from {@link
org.apache.druid.query.UnnestDataSource}
+ */
+public class UnnestSegmentReference implements SegmentReference
+{
+ private static final Logger log = new Logger(UnnestSegmentReference.class);
+
+ private final SegmentReference baseSegment;
+ private final String dimension;
+ private final String renamedOutputDimension;
+ private final LinkedHashSet<String> allowSet;
+
+ public UnnestSegmentReference(SegmentReference baseSegment, String
dimension, String outputName, LinkedHashSet<String> allowList)
+ {
+ this.baseSegment = baseSegment;
+ this.dimension = dimension;
+ this.renamedOutputDimension = outputName;
+ this.allowSet = allowList;
+ }
+
+ @Override
+ public Optional<Closeable> acquireReferences()
+ {
+ Closer closer = Closer.create();
+ try {
+ boolean acquireFailed = baseSegment.acquireReferences().map(closeable ->
{
+ closer.register(closeable);
+ return false;
+ }).orElse(true);
+
+ if (acquireFailed) {
+ CloseableUtils.closeAndWrapExceptions(closer);
+ return Optional.empty();
+ } else {
+ return Optional.of(closer);
+ }
+ }
+ catch (Throwable e) {
+ // acquireReferences is not permitted to throw exceptions.
+ CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed);
+ log.warn(e, "Exception encountered while trying to acquire reference");
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public SegmentId getId()
+ {
+ return baseSegment.getId();
+ }
+
+ @Override
+ public Interval getDataInterval()
+ {
+ return baseSegment.getDataInterval();
+ }
+
+ @Nullable
+ @Override
+ public QueryableIndex asQueryableIndex()
+ {
+ return null;
+ }
+
+ @Override
+ public StorageAdapter asStorageAdapter()
+ {
+ return new UnnestStorageAdapter(
+ baseSegment.asStorageAdapter(),
+ dimension,
+ renamedOutputDimension,
+ allowSet
+ );
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ baseSegment.close();
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
new file mode 100644
index 0000000000..f76ab89270
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+
+/**
+ * This class serves as the Storage Adapter for the Unnest Segment and is
responsible for creating the cursors
+ * If the column is dictionary encoded it creates {@link
UnnestDimensionCursor} else {@link UnnestColumnValueSelectorCursor}
+ * These cursors help navigate the segments for these cases
+ */
+public class UnnestStorageAdapter implements StorageAdapter
+{
+ private final StorageAdapter baseAdapter;
+ private final String dimensionToUnnest;
+ private final String outputColumnName;
+ private final LinkedHashSet<String> allowSet;
+
+ public UnnestStorageAdapter(
+ final StorageAdapter baseAdapter,
+ final String dimension,
+ final String outputColumnName,
+ final LinkedHashSet<String> allowSet
+ )
+ {
+ this.baseAdapter = baseAdapter;
+ this.dimensionToUnnest = dimension;
+ this.outputColumnName = outputColumnName;
+ this.allowSet = allowSet;
+ }
+
+ @Override
+ public Sequence<Cursor> makeCursors(
+ @Nullable Filter filter,
+ Interval interval,
+ VirtualColumns virtualColumns,
+ Granularity gran,
+ boolean descending,
+ @Nullable QueryMetrics<?> queryMetrics
+ )
+ {
+ Filter updatedFilter;
+ if (allowSet != null && !allowSet.isEmpty()) {
+ final InDimFilter allowListFilters;
+ allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
+ if (filter != null) {
+ updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
+ } else {
+ updatedFilter = allowListFilters;
+ }
+ } else {
+ updatedFilter = filter;
+ }
+ final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
+ updatedFilter,
+ interval,
+ virtualColumns,
+ gran,
+ descending,
+ queryMetrics
+ );
+
+ return Sequences.map(
+ baseCursorSequence,
+ cursor -> {
+ Objects.requireNonNull(cursor);
+ Cursor retVal = cursor;
+ ColumnCapabilities capabilities =
cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
+ if (capabilities != null) {
+ if
(capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue())
{
+ retVal = new UnnestDimensionCursor(
+ retVal,
+ retVal.getColumnSelectorFactory(),
+ dimensionToUnnest,
+ outputColumnName,
+ allowSet
+ );
+ } else {
+ retVal = new UnnestColumnValueSelectorCursor(
+ retVal,
+ retVal.getColumnSelectorFactory(),
+ dimensionToUnnest,
+ outputColumnName,
+ allowSet
+ );
+ }
+ } else {
+ retVal = new UnnestColumnValueSelectorCursor(
+ retVal,
+ retVal.getColumnSelectorFactory(),
+ dimensionToUnnest,
+ outputColumnName,
+ allowSet
+ );
+ }
+ return retVal;
+ }
+ );
+ }
+
+ @Override
+ public Interval getInterval()
+ {
+ return baseAdapter.getInterval();
+ }
+
+ @Override
+ public Indexed<String> getAvailableDimensions()
+ {
+ final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
+
+ for (String dim : baseAdapter.getAvailableDimensions()) {
+ availableDimensions.add(dim);
+ }
+ availableDimensions.add(outputColumnName);
+ return new ListIndexed<>(Lists.newArrayList(availableDimensions));
+ }
+
+ @Override
+ public Iterable<String> getAvailableMetrics()
+ {
+ return baseAdapter.getAvailableMetrics();
+ }
+
+ @Override
+ public int getDimensionCardinality(String column)
+ {
+ if (!outputColumnName.equals(column)) {
+ return baseAdapter.getDimensionCardinality(column);
+ }
+ return baseAdapter.getDimensionCardinality(dimensionToUnnest);
+ }
+
+ @Override
+ public DateTime getMinTime()
+ {
+ return baseAdapter.getMinTime();
+ }
+
+ @Override
+ public DateTime getMaxTime()
+ {
+ return baseAdapter.getMaxTime();
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMinValue(String column)
+ {
+ if (!outputColumnName.equals(column)) {
+ return baseAdapter.getMinValue(column);
+ }
+ return baseAdapter.getMinValue(dimensionToUnnest);
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMaxValue(String column)
+ {
+ if (!outputColumnName.equals(column)) {
+ return baseAdapter.getMaxValue(column);
+ }
+ return baseAdapter.getMaxValue(dimensionToUnnest);
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ if (!outputColumnName.equals(column)) {
+ return baseAdapter.getColumnCapabilities(column);
+ }
+ return baseAdapter.getColumnCapabilities(dimensionToUnnest);
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return 0;
+ }
+
+ @Override
+ public DateTime getMaxIngestedEventTime()
+ {
+ return baseAdapter.getMaxIngestedEventTime();
+ }
+
+ @Nullable
+ @Override
+ public Metadata getMetadata()
+ {
+ return baseAdapter.getMetadata();
+ }
+
+ public String getDimensionToUnnest()
+ {
+ return dimensionToUnnest;
+ }
+}
+
diff --git a/processing/src/test/java/org/apache/druid/segment/ListCursor.java
b/processing/src/test/java/org/apache/druid/segment/ListCursor.java
new file mode 100644
index 0000000000..666bc21be5
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/ListCursor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+
+/**
+ * A Cursor that iterates over a user created list.
+ * This is used to test the base cursor of an UnnestCursor.
+ * Usages can be found in tests of {@link UnnestColumnValueSelectorCursor} in
{@link UnnestColumnValueSelectorCursorTest}
+ * However this cannot help with {@link UnnestDimensionCursor}.
+ * Tests for {@link UnnestDimensionCursor} are done alongside tests for {@link
UnnestStorageAdapterTest}
+ */
+public class ListCursor implements Cursor
+{
+ List<Object> baseList;
+ private int index;
+
+ public ListCursor(List<Object> inputList)
+ {
+ this.baseList = inputList;
+ }
+
+ @Override
+ public ColumnSelectorFactory getColumnSelectorFactory()
+ {
+ return new ColumnSelectorFactory()
+ {
+ @Override
+ public DimensionSelector makeDimensionSelector(DimensionSpec
dimensionSpec)
+ {
+ return new DimensionSelector()
+ {
+ @Override
+ public IndexedInts getRow()
+ {
+ return null;
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(@Nullable String value)
+ {
+ return null;
+ }
+
+ @Override
+ public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+ {
+ return null;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ if (index < baseList.size()) {
+ return baseList.get(index);
+ }
+ return null;
+ }
+
+ @Override
+ public Class<?> classOfObject()
+ {
+ return null;
+ }
+
+ @Override
+ public int getValueCardinality()
+ {
+ return 0;
+ }
+
+ @Nullable
+ @Override
+ public String lookupName(int id)
+ {
+ return null;
+ }
+
+ @Override
+ public boolean nameLookupPossibleInAdvance()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public IdLookup idLookup()
+ {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public ColumnValueSelector makeColumnValueSelector(String columnName)
+ {
+ return new ColumnValueSelector()
+ {
+ @Override
+ public double getDouble()
+ {
+ return 0;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLong()
+ {
+ return 0;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public Object getObject()
+ {
+ if (index < baseList.size()) {
+ return baseList.get(index);
+ }
+ return null;
+ }
+
+ @Override
+ public Class classOfObject()
+ {
+ return null;
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public DateTime getTime()
+ {
+ return null;
+ }
+
+ @Override
+ public void advance()
+ {
+ advanceUninterruptibly();
+ BaseQuery.checkInterrupted();
+ }
+
+ @Override
+ public void advanceUninterruptibly()
+ {
+ index++;
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return index > baseList.size() - 1;
+ }
+
+ @Override
+ public boolean isDoneOrInterrupted()
+ {
+ return false;
+ }
+
+ @Override
+ public void reset()
+ {
+ index = 0;
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
new file mode 100644
index 0000000000..b3346e1e56
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class UnnestColumnValueSelectorCursorTest extends
InitializedNullHandlingTest
+{
+ private static String OUTPUT_NAME = "unnested-column";
+ private static LinkedHashSet<String> IGNORE_SET = null;
+ private static LinkedHashSet<String> IGNORE_SET1 = new
LinkedHashSet<>(Arrays.asList("b", "f"));
+
+
+ @Test
+ public void test_list_unnest_cursors()
+ {
+ ArrayList<Object> baseList = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ List<Object> newList = new ArrayList<>();
+ for (int j = 0; j < 2; j++) {
+ newList.add(String.valueOf(i * 2 + j));
+ }
+ baseList.add(newList);
+ }
+ ListCursor listCursor = new ListCursor(baseList);
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int j = 0;
+ while (!unnestCursor.isDone()) {
+ Object colSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j));
+ j++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(j, 4);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Collections.singletonList("j")
+ );
+
+ List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_only_nulls()
+ {
+ List<Object> inputList = Arrays.asList(
+ Collections.singletonList(null),
+ Arrays.asList(null, null),
+ Collections.singletonList(null)
+ );
+
+ List<String> expectedResults = Arrays.asList(null, null, null, null);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertNull(valueSelectorVal);
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 4);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_mixed_with_nulls()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b"),
+ Arrays.asList("b", "c"),
+ "d",
+ null,
+ null,
+ null
+ );
+
+ List<String> expectedResults = Arrays.asList("a", "b", "b", "c", "d",
null, null, null);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ if (valueSelectorVal == null) {
+ Assert.assertEquals(null, expectedResults.get(k));
+ } else {
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k));
+ }
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 8);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_strings_and_no_lists()
+ {
+ List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f", "g", "h",
"i", "j");
+
+ List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_strings_mixed_with_list()
+ {
+ List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f",
Arrays.asList("g", "h"), "i", "j");
+
+ List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_lists_three_levels()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Arrays.asList("j", Arrays.asList("a", "b"))
+ );
+
+ List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j", Arrays.asList("a", "b"));
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k).toString());
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 10);
+ }
+
+ @Test
+ public void
test_list_unnest_of_unnest_cursors_user_supplied_list_three_levels()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Arrays.asList("j", Arrays.asList("a", "b"))
+ );
+
+ List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j", "a", "b");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor childCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ UnnestColumnValueSelectorCursor parentCursor = new
UnnestColumnValueSelectorCursor(
+ childCursor,
+ childCursor.getColumnSelectorFactory(),
+ OUTPUT_NAME,
+ "tmp-out",
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
parentCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector("tmp-out");
+ int k = 0;
+ while (!parentCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k).toString());
+ k++;
+ parentCursor.advance();
+ }
+ Assert.assertEquals(k, 11);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_with_nulls()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i", null),
+ Collections.singletonList("j")
+ );
+
+ List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", null, "j");
+
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ if (valueSelectorVal == null) {
+ Assert.assertEquals(null, expectedResults.get(k));
+ } else {
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k));
+ }
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, expectedResults.size());
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_with_dups()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "a", "a"),
+ Arrays.asList("e", "f", null, "h", "i", null),
+ Collections.singletonList("j")
+ );
+
+ List<Object> expectedResults = Arrays.asList("a", "a", "a", "e", "f",
null, "h", "i", null, "j");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ if (valueSelectorVal == null) {
+ Assert.assertEquals(null, expectedResults.get(k));
+ } else {
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k));
+ }
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 10);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_with_ignore_set()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Collections.singletonList("j")
+ );
+
+ List<String> expectedResults = Arrays.asList("b", "f");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET1
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ if (valueSelectorVal == null) {
+ Assert.assertEquals(null, expectedResults.get(k));
+ } else {
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k));
+ }
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 2);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_double()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6, 7, 8),
+ Collections.singletonList(9)
+ );
+
+ List<Double> expectedResults = Arrays.asList(1d, 2d, 3d, 4d, 5d, 6d, 7d,
8d, 9d);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Double valueSelectorVal = unnestColumnValueSelector.getDouble();
+ Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_float()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6, 7, 8),
+ Collections.singletonList(9)
+ );
+
+ List<Float> expectedResults = Arrays.asList(1f, 2f, 3f, 4f, 5f, 6f, 7f,
8f, 9f);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Float valueSelectorVal = unnestColumnValueSelector.getFloat();
+ Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_long()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6, 7, 8),
+ Collections.singletonList(9)
+ );
+
+ List<Long> expectedResults = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
9L);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object obj = unnestColumnValueSelector.getObject();
+ Assert.assertNotNull(obj);
+ Long valueSelectorVal = unnestColumnValueSelector.getLong();
+ Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+
+ @Test
+ public void
test_list_unnest_cursors_user_supplied_list_three_level_arrays_and_methods()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Arrays.asList("j", Arrays.asList("a", "b"))
+ );
+
+ List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j", Arrays.asList("a", "b"));
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k).toString());
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 10);
+ unnestCursor.reset();
+ Assert.assertFalse(unnestCursor.isDoneOrInterrupted());
+ }
+
+ @Test(expected = UOE.class)
+ public void test_list_unnest_cursors_dimSelector()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList("a", "b", "c"),
+ Arrays.asList("e", "f", "g", "h", "i"),
+ Collections.singletonList("j")
+ );
+
+ List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g",
"h", "i", "j");
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+
unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME));
+ }
+
+ @Test
+ public void test_list_unnest_cursors_user_supplied_list_of_integers()
+ {
+ List<Object> inputList = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6, 7, 8),
+ Collections.singletonList(9)
+ );
+
+ List<Integer> expectedResults = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ //Create base cursor
+ ListCursor listCursor = new ListCursor(inputList);
+
+ //Create unnest cursor
+ UnnestColumnValueSelectorCursor unnestCursor = new
UnnestColumnValueSelectorCursor(
+ listCursor,
+ listCursor.getColumnSelectorFactory(),
+ "dummy",
+ OUTPUT_NAME,
+ IGNORE_SET
+ );
+ ColumnValueSelector unnestColumnValueSelector =
unnestCursor.getColumnSelectorFactory()
+
.makeColumnValueSelector(OUTPUT_NAME);
+ int k = 0;
+ while (!unnestCursor.isDone()) {
+ Object valueSelectorVal = unnestColumnValueSelector.getObject();
+ Assert.assertEquals(valueSelectorVal.toString(),
expectedResults.get(k).toString());
+ k++;
+ unnestCursor.advance();
+ }
+ Assert.assertEquals(k, 9);
+ }
+}
+
diff --git
a/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
new file mode 100644
index 0000000000..35d42b82d4
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.utils.CloseableUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
+{
+ private static Closer CLOSER;
+ private static IncrementalIndex INCREMENTAL_INDEX;
+ private static IncrementalIndexStorageAdapter
INCREMENTAL_INDEX_STORAGE_ADAPTER;
+ private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER;
+ private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1;
+ private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER2;
+ private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER3;
+ private static List<StorageAdapter> ADAPTERS;
+ private static String COLUMNNAME = "multi-string1";
+ private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1";
+ private static String OUTPUT_COLUMN_NAME1 = "unnested-multi-string1-again";
+ private static LinkedHashSet<String> IGNORE_SET = new
LinkedHashSet<>(Arrays.asList("1", "3", "5"));
+
+ @BeforeClass
+ public static void setup()
+ {
+ CLOSER = Closer.create();
+ final GeneratorSchemaInfo schemaInfo =
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+ final DataSegment dataSegment = DataSegment.builder()
+ .dataSource("foo")
+
.interval(schemaInfo.getDataInterval())
+ .version("1")
+ .shardSpec(new
LinearShardSpec(0))
+ .size(0)
+ .build();
+ final SegmentGenerator segmentGenerator = CLOSER.register(new
SegmentGenerator());
+
+ final int numRows = 2;
+ INCREMENTAL_INDEX = CLOSER.register(
+ segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo,
Granularities.HOUR, numRows)
+ );
+ INCREMENTAL_INDEX_STORAGE_ADAPTER = new
IncrementalIndexStorageAdapter(INCREMENTAL_INDEX);
+ UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter(
+ INCREMENTAL_INDEX_STORAGE_ADAPTER,
+ COLUMNNAME,
+ OUTPUT_COLUMN_NAME,
+ null
+ );
+ UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter(
+ INCREMENTAL_INDEX_STORAGE_ADAPTER,
+ COLUMNNAME,
+ OUTPUT_COLUMN_NAME,
+ IGNORE_SET
+ );
+ UNNEST_STORAGE_ADAPTER2 = new UnnestStorageAdapter(
+ UNNEST_STORAGE_ADAPTER,
+ COLUMNNAME,
+ OUTPUT_COLUMN_NAME1,
+ null
+ );
+ UNNEST_STORAGE_ADAPTER3 = new UnnestStorageAdapter(
+ UNNEST_STORAGE_ADAPTER1,
+ COLUMNNAME,
+ OUTPUT_COLUMN_NAME1,
+ IGNORE_SET
+ );
+ ADAPTERS = ImmutableList.of(
+ UNNEST_STORAGE_ADAPTER,
+ UNNEST_STORAGE_ADAPTER1,
+ UNNEST_STORAGE_ADAPTER2,
+ UNNEST_STORAGE_ADAPTER3
+ );
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {
+ });
+ }
+
+ @Test
+ public void test_group_of_unnest_adapters_methods()
+ {
+ String colName = "multi-string1";
+ for (StorageAdapter adapter : ADAPTERS) {
+ Assert.assertEquals(
+ DateTimes.of("2000-01-01T23:00:00.000Z"),
+ adapter.getMaxTime()
+ );
+ Assert.assertEquals(
+ DateTimes.of("2000-01-01T12:00:00.000Z"),
+ adapter.getMinTime()
+ );
+ adapter.getColumnCapabilities(colName);
+ Assert.assertEquals(adapter.getNumRows(), 0);
+ Assert.assertNotNull(adapter.getMetadata());
+ Assert.assertEquals(
+ DateTimes.of("2000-01-01T23:59:59.999Z"),
+ adapter.getMaxIngestedEventTime()
+ );
+ Assert.assertEquals(
+ adapter.getColumnCapabilities(colName).toColumnType(),
+
INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType()
+ );
+ Assert.assertEquals(((UnnestStorageAdapter)
adapter).getDimensionToUnnest(), colName);
+ }
+ }
+
+ @Test
+ public void test_group_of_unnest_adapters_column_capabilities()
+ {
+ String colName = "multi-string1";
+ List<String> columnsInTable = Arrays.asList(
+ "string1",
+ "long1",
+ "double1",
+ "float1",
+ "multi-string1",
+ OUTPUT_COLUMN_NAME
+ );
+ List<ValueType> valueTypes = Arrays.asList(
+ ValueType.STRING,
+ ValueType.LONG,
+ ValueType.DOUBLE,
+ ValueType.FLOAT,
+ ValueType.STRING,
+ ValueType.STRING
+ );
+ UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER;
+
+ for (int i = 0; i < columnsInTable.size(); i++) {
+ ColumnCapabilities capabilities =
adapter.getColumnCapabilities(columnsInTable.get(i));
+ Assert.assertEquals(capabilities.getType(), valueTypes.get(i));
+ }
+ Assert.assertEquals(adapter.getDimensionToUnnest(), colName);
+
+ }
+
+ @Test
+ public void test_unnest_adapters_basic()
+ {
+
+ Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER.makeCursors(
+ null,
+ UNNEST_STORAGE_ADAPTER.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+
+ cursorSequence.accumulate(null, (accumulated, cursor) -> {
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ DimensionSelector dimSelector =
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+ int count = 0;
+ while (!cursor.isDone()) {
+ Object dimSelectorVal = dimSelector.getObject();
+ if (dimSelectorVal == null) {
+ Assert.assertNull(dimSelectorVal);
+ }
+ cursor.advance();
+ count++;
+ }
+ /*
+ each row has 8 entries.
+ unnest 2 rows -> 16 rows after unnest
+ */
+ Assert.assertEquals(count, 16);
+ return null;
+ });
+
+ }
+
+ @Test
+ public void test_two_levels_of_unnest_adapters()
+ {
+ Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER2.makeCursors(
+ null,
+ UNNEST_STORAGE_ADAPTER2.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+
+
+ cursorSequence.accumulate(null, (accumulated, cursor) -> {
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ DimensionSelector dimSelector =
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
+ ColumnValueSelector valueSelector =
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
+
+ int count = 0;
+ while (!cursor.isDone()) {
+ Object dimSelectorVal = dimSelector.getObject();
+ Object valueSelectorVal = valueSelector.getObject();
+ if (dimSelectorVal == null) {
+ Assert.assertNull(dimSelectorVal);
+ } else if (valueSelectorVal == null) {
+ Assert.assertNull(valueSelectorVal);
+ }
+ cursor.advance();
+ count++;
+ }
+ /*
+ each row has 8 entries.
+ unnest 2 rows -> 16 entries also the value cardinality
+ unnest of unnest -> 16*8 = 128 rows
+ */
+ Assert.assertEquals(count, 128);
+ Assert.assertEquals(dimSelector.getValueCardinality(), 16);
+ return null;
+ });
+ }
+
+ @Test
+ public void test_unnest_adapters_with_allowList()
+ {
+ final String columnName = "multi-string1";
+
+ Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+ null,
+ UNNEST_STORAGE_ADAPTER1.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+
+ cursorSequence.accumulate(null, (accumulated, cursor) -> {
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ DimensionSelector dimSelector =
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+ ColumnValueSelector valueSelector =
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME);
+
+ int count = 0;
+ while (!cursor.isDone()) {
+ Object dimSelectorVal = dimSelector.getObject();
+ Object valueSelectorVal = valueSelector.getObject();
+ if (dimSelectorVal == null) {
+ Assert.assertNull(dimSelectorVal);
+ } else if (valueSelectorVal == null) {
+ Assert.assertNull(valueSelectorVal);
+ }
+ cursor.advance();
+ count++;
+ }
+ /*
+ each row has 8 distinct entries.
+ allowlist has 3 entries also the value cardinality
+ unnest will have 3 distinct entries
+ */
+ Assert.assertEquals(count, 3);
+ Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+ return null;
+ });
+ }
+
+ @Test
+ public void test_two_levels_of_unnest_adapters_with_allowList()
+ {
+ final String columnName = "multi-string1";
+
+ Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER3.makeCursors(
+ null,
+ UNNEST_STORAGE_ADAPTER3.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+ UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER3;
+ Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+ Assert.assertEquals(
+
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+ ColumnCapabilities.Capable.TRUE
+ );
+ Assert.assertEquals(adapter.getMaxValue(columnName),
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+ Assert.assertEquals(adapter.getMinValue(columnName),
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+ cursorSequence.accumulate(null, (accumulated, cursor) -> {
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ DimensionSelector dimSelector =
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
+ ColumnValueSelector valueSelector =
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
+
+ int count = 0;
+ while (!cursor.isDone()) {
+ Object dimSelectorVal = dimSelector.getObject();
+ Object valueSelectorVal = valueSelector.getObject();
+ if (dimSelectorVal == null) {
+ Assert.assertNull(dimSelectorVal);
+ } else if (valueSelectorVal == null) {
+ Assert.assertNull(valueSelectorVal);
+ }
+ cursor.advance();
+ count++;
+ }
+ /*
+ each row has 8 distinct entries.
+ allowlist has 3 entries also the value cardinality
+ unnest will have 3 distinct entries
+ unnest of that unnest will have 3*3 = 9 entries
+ */
+ Assert.assertEquals(count, 9);
+ Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+ return null;
+ });
+ }
+
+ @Test
+ public void test_unnest_adapters_methods_with_allowList()
+ {
+ final String columnName = "multi-string1";
+
+ Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+ null,
+ UNNEST_STORAGE_ADAPTER1.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ );
+ UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1;
+ Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+ Assert.assertEquals(
+
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+ ColumnCapabilities.Capable.TRUE
+ );
+ Assert.assertEquals(adapter.getMaxValue(columnName),
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+ Assert.assertEquals(adapter.getMinValue(columnName),
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+ cursorSequence.accumulate(null, (accumulated, cursor) -> {
+ ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+ DimensionSelector dimSelector =
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+ IdLookup idlookUp = dimSelector.idLookup();
+ Assert.assertFalse(dimSelector.isNull());
+ int[] indices = new int[]{1, 3, 5};
+ int count = 0;
+ while (!cursor.isDone()) {
+ Object dimSelectorVal = dimSelector.getObject();
+ Assert.assertEquals(idlookUp.lookupId((String) dimSelectorVal),
indices[count]);
+ // after unnest first entry in get row should equal the object
+ // and the row size will always be 1
+ Assert.assertEquals(dimSelector.getRow().get(0), indices[count]);
+ Assert.assertEquals(dimSelector.getRow().size(), 1);
+ Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME));
+ cursor.advance();
+ count++;
+ }
+ Assert.assertEquals(dimSelector.getValueCardinality(), 3);
+ Assert.assertEquals(count, 3);
+ return null;
+ });
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]