imply-cheddar commented on code in PR #13268: URL: https://github.com/apache/druid/pull/13268#discussion_r1022115086
########## processing/src/main/java/org/apache/druid/query/UnnestDataSource.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.UnnestSegmentReference; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnnestDataSource implements DataSource +{ + private final DataSource base; + private final String column; + private final String outputName; + private final LinkedHashSet<String> allowList; + + + private UnnestDataSource( + DataSource dataSource, + String columnName, + String outputName, + LinkedHashSet<String> allowList + ) + { + this.base = dataSource; + this.column = columnName; + this.outputName = outputName; + this.allowList = allowList; + } + + /** + * Create an unnest dataSource from a string condition. Review Comment: What is this comment trying to tell me? ########## processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java: ########## @@ -125,6 +126,11 @@ public static DataSourceAnalysis forDataSource(final DataSource dataSource) current = subQuery.getDataSource(); } + while (current instanceof UnnestDataSource) { Review Comment: If there is a Query of an Unnest of a Query of an Unnest, the way that you have interleaved these is not going to completely unwrap the objects as expected. This DataSourceAnalysis thing is probably another thing to move onto the DataSource object itself... Not sure if we should do that now or leave it as something to do for later though. either way, you need both conditions (check for Query and check for Unnest) on the while loop above. ########## processing/src/main/java/org/apache/druid/query/UnnestDataSource.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.UnnestSegmentReference; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnnestDataSource implements DataSource +{ + private final DataSource base; + private final String column; + private final String outputName; + private final LinkedHashSet<String> allowList; + + + private UnnestDataSource( + DataSource dataSource, + String columnName, + String outputName, + LinkedHashSet<String> allowList + ) + { + this.base = dataSource; + this.column = columnName; + this.outputName = outputName; + this.allowList = allowList; + } + + /** + * Create an unnest dataSource from a string condition. + */ + @JsonCreator + public static UnnestDataSource create( + @JsonProperty("base") DataSource base, + @JsonProperty("column") String columnName, + @JsonProperty("outputName") String outputName, + @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList + ) + { + return new UnnestDataSource(base, columnName, outputName, allowList); + } + + @JsonProperty("base") + public DataSource getBase() + { + return base; + } + + @JsonProperty("column") + public String getColumn() + { + return column; + } + + @JsonProperty("outputName") + public String getOutputName() + { + return outputName; + } + + @JsonProperty("allowList") + public LinkedHashSet<String> getAllowList() + { + return allowList; + } + + @Override + public Set<String> getTableNames() + { + return base.getTableNames(); + } + + @Override + public List<DataSource> getChildren() + { + return ImmutableList.of(base); + } + + @Override + public DataSource withChildren(List<DataSource> children) + { + if (children.size() != 1) { + throw new IAE("Expected [1] child, got [%d]", children.size()); + } + return new UnnestDataSource(children.get(0), column, outputName, allowList); + } + + @Override + public boolean isCacheable(boolean isBroker) + { + return base.isCacheable(isBroker); + } + + @Override + public boolean isGlobal() + { + return base.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return base.isConcrete(); + } + + @Override + public Function<SegmentReference, SegmentReference> createSegmentMapFunction( + Query query, + AtomicLong cpuTimeAccumulator + ) + { + return JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> { + if (column == null) { + return Function.identity(); + } else if (column.isEmpty()) { + return Function.identity(); + } else { + return baseSegment -> + new UnnestSegmentReference( + baseSegment, + column, + outputName, + allowList + ); + } + } + ); Review Comment: This code doesn't seem to be delegating to its child, do you have any tests that test for, e.g. nesting of these things? ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && baseCursor.isDone() == false) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && baseCursor.isDoneOrInterrupted() == false) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + @Override + public void initialize() + { + if (columnValueSelector != null) { Review Comment: Why would columnValueSelector ever be null? ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) Review Comment: Here too, this object should depend on the dimensionSelector and expect that the thing that decided to call it already has a good dimension selector for it. ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory(); + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return dimSelector.getRow(); Review Comment: I expected to see flattening here, but I'm not. Are you sure that this code is working as expected? There's a chance that the tests aren't actually validating this as, without code here, I don't believe that this will actually unnest the array. ########## processing/src/main/java/org/apache/druid/segment/UnnestCursor.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +/** + * UnnestCursor is an abstraction over cursors for unnest. + * There are two types of cursors : + * -- DimensionUnnestCursor which is called if the column is disctionary encoded + * -- ColumnarValueCursor otherwise + */ +public interface UnnestCursor extends Cursor Review Comment: I don't believe this interface adds anything. You don't store references to objects of this interface, the interface appears to just be trying to create definitions for the internal implementation details of the cursors you are creating. Those classes can already access their own internal private methods, so this interface isn't actually needed, just delete it. If you want to keep the javadoc, move it to the methods on the concrete classes. ########## processing/src/test/java/org/apache/druid/segment/ListCursor.java: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class ListCursor implements Cursor +{ + public static int LIMIT = 2; + List<Object> baseList; + private int index; + + public ListCursor() + { + this.index = 0; + populateList(); + } + + public ListCursor(List<Object> inputList) + { + this.baseList = inputList; + } + + void populateList() Review Comment: I think what you want is a static method that builds a ListCursor rather than a method that can get called at any point in time to mutate and changes the internals of the `ListCursor` object. ########## processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java: ########## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.generator.GeneratorBasicSchemas; +import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.utils.CloseableUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; + +public class UnnestStorageAdapterTest extends InitializedNullHandlingTest +{ + private static Closer CLOSER; + private static IncrementalIndex INCREMENTAL_INDEX; + private static IncrementalIndexStorageAdapter INCREMENTAL_INDEX_STORAGE_ADAPTER; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1; + private static List<StorageAdapter> ADAPTERS; + private static String COLUMNNAME = "multi-string1"; + private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1"; + private static LinkedHashSet<String> IGNORE_SET = new LinkedHashSet<>(Arrays.asList("1", "5", "100")); + + @BeforeClass + public static void setup() + { + CLOSER = Closer.create(); + final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench"); + + final DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(schemaInfo.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + final SegmentGenerator segmentGenerator = CLOSER.register(new SegmentGenerator()); + + final int numRows = 10_000; + INCREMENTAL_INDEX = CLOSER.register( + segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo, Granularities.HOUR, numRows) + ); + INCREMENTAL_INDEX_STORAGE_ADAPTER = new IncrementalIndexStorageAdapter(INCREMENTAL_INDEX); + UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter( + INCREMENTAL_INDEX_STORAGE_ADAPTER, + COLUMNNAME, + OUTPUT_COLUMN_NAME, + null + ); + UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter( + INCREMENTAL_INDEX_STORAGE_ADAPTER, + COLUMNNAME, + OUTPUT_COLUMN_NAME, + IGNORE_SET + ); + ADAPTERS = ImmutableList.of(UNNEST_STORAGE_ADAPTER); + } + + @AfterClass + public static void teardown() + { + CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> { + }); + } + + @Test + public void test_unnest_adapter_methods() + { + String colName = "multi-string1"; + for (StorageAdapter adapter : ADAPTERS) { + Assert.assertEquals( + DateTimes.of("2000-01-01T23:00:00.000Z"), + adapter.getMaxTime() + ); + Assert.assertEquals( + DateTimes.of("2000-01-01T00:00:00.000Z"), + adapter.getMinTime() + ); + adapter.getColumnCapabilities(colName); + Assert.assertEquals(adapter.getNumRows(), 0); + Assert.assertNotNull(adapter.getMetadata()); + Assert.assertEquals( + DateTimes.of("2000-01-01T23:59:59.999Z"), + adapter.getMaxIngestedEventTime() + ); + Assert.assertEquals( + adapter.getColumnCapabilities(colName).toColumnType(), + INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType() + ); + Assert.assertEquals(UNNEST_STORAGE_ADAPTER.getDimensionToUnnest(), colName); + } + } + + @Test + public void test_unnest_adapters_basic() + { + final String columnName = "multi-string1"; + + for (StorageAdapter adapter : ADAPTERS) { + Sequence<Cursor> cursorSequence = adapter.makeCursors( + null, + adapter.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME); + + + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Object valueSelectorVal = valueSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } else if (valueSelectorVal == null) { + Assert.assertNull(valueSelectorVal); + } + cursor.advance(); + } + return null; + }); + } + } + + @Test + public void test_unnest_adapters_allowList() + { + final String columnName = "multi-string1"; + + Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors( + null, + UNNEST_STORAGE_ADAPTER1.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME); + + + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Object valueSelectorVal = valueSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } else if (valueSelectorVal == null) { + Assert.assertNull(valueSelectorVal); + } + cursor.advance(); + } + return null; + }); + } + + @Test + public void test_dim_unnest_adapters() + { + final String columnName = "multi-string1"; + + Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors( + null, + UNNEST_STORAGE_ADAPTER1.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1; + Assert.assertEquals(adapter.getDimensionToUnnest(), columnName); + Assert.assertEquals( + adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(), + ColumnCapabilities.Capable.TRUE + ); + Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME)); + Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME)); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + + + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } + cursor.advance(); + } + return null; + }); + } + + @Test + public void test_dim_unnest_adapters_methods() + { + final String columnName = "multi-string1"; + + Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors( + null, + UNNEST_STORAGE_ADAPTER1.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1; + Assert.assertEquals(adapter.getDimensionToUnnest(), columnName); + Assert.assertEquals( + adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(), + ColumnCapabilities.Capable.TRUE + ); + Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME)); + Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME)); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + + + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Assert.assertNotNull(dimSelector.getRow()); + Assert.assertNotNull(dimSelector.getValueCardinality()); + Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME)); + Assert.assertNotNull(dimSelector.idLookup()); + Assert.assertNotNull(dimSelector.lookupName(0)); + Assert.assertNotNull(dimSelector.defaultGetObject()); + Assert.assertFalse(dimSelector.isNull()); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } Review Comment: The assertions here should be updated. You should be able to know and validate the sepcific ValueCardinality and this seems to always be looking up the value for the 0 index, which shouldn't be correct. If the test isn't actually walking through rows with different dictionary values, it's not really validating what we need it to. ########## processing/src/main/java/org/apache/druid/query/UnnestDataSource.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.UnnestSegmentReference; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnnestDataSource implements DataSource +{ + private final DataSource base; + private final String column; + private final String outputName; + private final LinkedHashSet<String> allowList; + + + private UnnestDataSource( + DataSource dataSource, + String columnName, + String outputName, + LinkedHashSet<String> allowList + ) + { + this.base = dataSource; + this.column = columnName; + this.outputName = outputName; + this.allowList = allowList; + } + + /** + * Create an unnest dataSource from a string condition. + */ + @JsonCreator + public static UnnestDataSource create( + @JsonProperty("base") DataSource base, + @JsonProperty("column") String columnName, + @JsonProperty("outputName") String outputName, + @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList + ) + { + return new UnnestDataSource(base, columnName, outputName, allowList); + } + + @JsonProperty("base") + public DataSource getBase() + { + return base; + } + + @JsonProperty("column") + public String getColumn() + { + return column; + } + + @JsonProperty("outputName") + public String getOutputName() + { + return outputName; + } + + @JsonProperty("allowList") + public LinkedHashSet<String> getAllowList() + { + return allowList; + } + + @Override + public Set<String> getTableNames() + { + return base.getTableNames(); + } + + @Override + public List<DataSource> getChildren() + { + return ImmutableList.of(base); + } + + @Override + public DataSource withChildren(List<DataSource> children) + { + if (children.size() != 1) { + throw new IAE("Expected [1] child, got [%d]", children.size()); + } + return new UnnestDataSource(children.get(0), column, outputName, allowList); + } + + @Override + public boolean isCacheable(boolean isBroker) + { + return base.isCacheable(isBroker); + } + + @Override + public boolean isGlobal() + { + return base.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return base.isConcrete(); + } + + @Override + public Function<SegmentReference, SegmentReference> createSegmentMapFunction( + Query query, + AtomicLong cpuTimeAccumulator + ) + { + return JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> { + if (column == null) { + return Function.identity(); + } else if (column.isEmpty()) { + return Function.identity(); + } else { + return baseSegment -> + new UnnestSegmentReference( + baseSegment, + column, + outputName, + allowList + ); + } + } + ); + + } + + @Override + public DataSource withUpdatedDataSource(DataSource newSource) + { + return null; Review Comment: Is this never called? If it is, my guess is that it will produce an NPE. Maybe include a comment about why it is safe to do this? ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) Review Comment: It should be safe to pass in the baseColumnSelectorFactory directly. Once you've made the decision to use this object, you should already have a good column selector factory to use. ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && baseCursor.isDone() == false) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && baseCursor.isDoneOrInterrupted() == false) { Review Comment: `==` on a boolean thing is a bit weird? Just `!baseCursor.isDoneOrInterrupted()`? ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); Review Comment: You are not intercepting the calls for the dimension from here, you will need to intercept them. ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector); Review Comment: Why can't you reuse the object from the constructor? ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && baseCursor.isDone() == false) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && baseCursor.isDoneOrInterrupted() == false) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + @Override + public void initialize() + { + if (columnValueSelector != null) { + this.currentVal = this.columnValueSelector.getObject(); + this.unnestListForCurrentRow = new ArrayList<>(); + if (currentVal == null) { + unnestListForCurrentRow = new ArrayList<>(); + unnestListForCurrentRow.add(null); + } else { + if (currentVal instanceof List) { + unnestListForCurrentRow = (List<Object>) currentVal; + } else if (currentVal.getClass().equals(String.class)) { + unnestListForCurrentRow.add(currentVal); Review Comment: Adding more and more things to the list is going to effectively mean that you are keeping a reference to all of the values in memory as you go through the processing. This is effectively a memory leak and I'm not sure it's necessary? While it might seem ugly, I think you would be better off with a reference for a single Object and a reference for a List<> and then actually having logic that decides which one it is "iterating" over. ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory(); + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return dimSelector.getRow(); + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return lookupName(indexedIntsForCurrentRow.get(index)).equals(value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate<String> predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseCursor.getColumnSelectorFactory() + .makeDimensionSelector(actualDimensionSpec) + .inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (indexedIntsForCurrentRow != null) { + if (allowedBitSet.isEmpty()) { Review Comment: I don't see anywhere that `allowedBitSet` is getting mutated, why would it ever be non-empty? ########## processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +public class ColumnarValueUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private int index; + private Object currentVal; + private List<Object> unnestListForCurrentRow; + private boolean needInitialization; + + public ColumnarValueUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumSelectorFactory = cursor.getColumnSelectorFactory(); + this.columnValueSelector = this.baseColumSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Double.valueOf((String) value); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Float.valueOf((String) value); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + return Long.valueOf((String) value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(columnName)) { + baseColumSelectorFactory.getColumnCapabilities(column); + } + return baseColumSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && baseCursor.isDone() == false) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && baseCursor.isDoneOrInterrupted() == false) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + @Override + public void initialize() + { + if (columnValueSelector != null) { + this.currentVal = this.columnValueSelector.getObject(); + this.unnestListForCurrentRow = new ArrayList<>(); + if (currentVal == null) { + unnestListForCurrentRow = new ArrayList<>(); + unnestListForCurrentRow.add(null); + } else { + if (currentVal instanceof List) { + unnestListForCurrentRow = (List<Object>) currentVal; + } else if (currentVal.getClass().equals(String.class)) { + unnestListForCurrentRow.add(currentVal); + } + } + if (allowSet != null) { + if (!allowSet.isEmpty()) { + if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) { + advance(); + } + } + } + } + needInitialization = false; + } + + @Override + public void advanceAndUpdate() Review Comment: The logic of `initialize` and the logic of `advanceAndUpdate` seem very, very similar to me. Is there a reason that the initialization cannot just be a pre-emptive `advanceAndUpdate`? ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory(); + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return dimSelector.getRow(); + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return lookupName(indexedIntsForCurrentRow.get(index)).equals(value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector); Review Comment: Is there a reason we cannot reuse the object from the constructor? ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory(); + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return dimSelector.getRow(); + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return lookupName(indexedIntsForCurrentRow.get(index)).equals(value); Review Comment: This is going to lookup and build a String for every row that is processed. It completely negates the benefits of having a dictionary and actually makes the dictionary processed stuff slower. The dictionary has already converted teh string into an integer, you can lookup the dictionary id and then compare against that without ever building a String, do that instead. ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; + +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + final InDimFilter allowListFilters; + if (allowSet != null && !allowSet.isEmpty()) { + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + assert cursor != null; Review Comment: No asserts in code, either you actually want to validate the thing and should be checking it or you trust that the code is doing what you expect and don't need to check it. Asserts slow things down and don't actually help in production (I know that people will tell you that asserts don't run in production code, but profiling disagrees) ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; + +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + final InDimFilter allowListFilters; + if (allowSet != null && !allowSet.isEmpty()) { + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + assert cursor != null; + Cursor retVal = cursor; + UnnestCursor retUnnestCursor; + ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); + if (capabilities.isDictionaryEncoded() == ColumnCapabilities.Capable.TRUE + && capabilities.areDictionaryValuesUnique() == ColumnCapabilities.Capable.TRUE) { + retUnnestCursor = new DimensionUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } else { + retUnnestCursor = new ColumnarValueUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } + return retUnnestCursor; + } + ); + } + + @Override + public Interval getInterval() + { + return baseAdapter.getInterval(); + } + + @Override + public Indexed<String> getAvailableDimensions() + { + final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>(); + + for (String dim : baseAdapter.getAvailableDimensions()) { + availableDimensions.add(dim); + } + // check to see if output name provided is already + // a part of available dimensions + if (availableDimensions.contains(outputColumnName)) { + throw new IAE( + "Provided output name [%s] already exists in table to be unnested. Please use a different name.", + outputColumnName + ); + } else { + availableDimensions.add(outputColumnName); + } + return new ListIndexed<>(Lists.newArrayList(availableDimensions)); + } Review Comment: Why is it bad for the output name to already exist? ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; + +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + final InDimFilter allowListFilters; Review Comment: This doesn't need to be defined outside of the if? ########## processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +public class DimensionUnnestCursor implements UnnestCursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet<String> allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + + public DimensionUnnestCursor( + Cursor cursor, + String columnName, + String outputColumnName, + LinkedHashSet<String> allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory(); + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + final DimensionSpec actualDimensionSpec = dimensionSpec.withDimension(columnName); + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return dimSelector.getRow(); + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return lookupName(indexedIntsForCurrentRow.get(index)).equals(value); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate<String> predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + baseCursor.getColumnSelectorFactory() + .makeDimensionSelector(actualDimensionSpec) + .inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (indexedIntsForCurrentRow != null) { + if (allowedBitSet.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else { + return null; + } + return null; + } + + @Override + public Class<?> classOfObject() + { + return Object.class; + } + + @Override + public int getValueCardinality() + { + return 0; Review Comment: This value should be known. ########## processing/src/test/java/org/apache/druid/query/UnnestQueryRunnerTest.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.testing.InitializedNullHandlingTest; + +// TODO: Use placementish column in the QueryRunnerHelperTest +// and set up native queries +public class UnnestQueryRunnerTest extends InitializedNullHandlingTest Review Comment: Is it intentional that this test is completely empty? ########## processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; + +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet<String> allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet<String> allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence<Cursor> makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics<?> queryMetrics + ) + { + Filter updatedFilter; + final InDimFilter allowListFilters; + if (allowSet != null && !allowSet.isEmpty()) { + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + assert cursor != null; + Cursor retVal = cursor; + UnnestCursor retUnnestCursor; + ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); + if (capabilities.isDictionaryEncoded() == ColumnCapabilities.Capable.TRUE + && capabilities.areDictionaryValuesUnique() == ColumnCapabilities.Capable.TRUE) { + retUnnestCursor = new DimensionUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } else { + retUnnestCursor = new ColumnarValueUnnestCursor(retVal, dimensionToUnnest, outputColumnName, allowSet); + } + return retUnnestCursor; + } + ); + } + + @Override + public Interval getInterval() + { + return baseAdapter.getInterval(); + } + + @Override + public Indexed<String> getAvailableDimensions() + { + final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>(); + + for (String dim : baseAdapter.getAvailableDimensions()) { + availableDimensions.add(dim); + } + // check to see if output name provided is already + // a part of available dimensions + if (availableDimensions.contains(outputColumnName)) { + throw new IAE( + "Provided output name [%s] already exists in table to be unnested. Please use a different name.", + outputColumnName + ); + } else { + availableDimensions.add(outputColumnName); + } + return new ListIndexed<>(Lists.newArrayList(availableDimensions)); + } + + @Override + public Iterable<String> getAvailableMetrics() + { + return baseAdapter.getAvailableMetrics(); + } + + @Override + public int getDimensionCardinality(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getDimensionCardinality(column); + } + return baseAdapter.getDimensionCardinality(dimensionToUnnest); + } + + @Override + public DateTime getMinTime() + { + return baseAdapter.getMinTime(); + } + + @Override + public DateTime getMaxTime() + { + return baseAdapter.getMaxTime(); + } + + @Nullable + @Override + public Comparable getMinValue(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getMinValue(column); + } + return baseAdapter.getMinValue(dimensionToUnnest); + } + + @Nullable + @Override + public Comparable getMaxValue(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getMaxValue(column); + } + return baseAdapter.getMaxValue(dimensionToUnnest); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (outputColumnName.equals(dimensionToUnnest)) { + return baseAdapter.getColumnCapabilities(column); + } + return baseAdapter.getColumnCapabilities(dimensionToUnnest); + } + + @Override + public int getNumRows() + { + return 0; Review Comment: I'm unsure if it's safe to return 0 from this... We should double check what uses this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
