cryptoe commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1157183209
##########
processing/src/main/java/org/apache/druid/frame/field/FieldWriters.java:
##########
@@ -57,10 +58,14 @@ private FieldWriters()
public static FieldWriter create(
final ColumnSelectorFactory columnSelectorFactory,
final String columnName,
- final ColumnType columnType
+ final ColumnType columnType,
+ final boolean allowNullColumnType
Review Comment:
We should mention the null handling in the java doc
##########
processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java:
##########
@@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()
JodaStuff.register(this);
+ addSerializer(FramesBackedInlineDataSource.class, new
FramesBackedInlineDataSourceSerializer());
Review Comment:
Can you let us know the reason why do you think adding the serializer here
makes sense.
##########
processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java:
##########
@@ -451,6 +462,49 @@ public Sequence<Object[]> resultsAsArrays(
);
}
+ @Override
+ public boolean canFetchResultsAsFrames()
+ {
+ return true;
+ }
+
+ @Override
+ public Sequence<FrameSignaturePair> resultsAsFrames(
+ TimeseriesQuery query,
+ Sequence<Result<TimeseriesResultValue>> resultSequence
+ )
+ {
+ final RowSignature rowSignature = resultArraySignature(query);
Review Comment:
Similar memory logic trick here as discussed in the scan query tool chest
##########
processing/src/main/java/org/apache/druid/frame/field/FieldWriters.java:
##########
@@ -57,10 +58,14 @@ private FieldWriters()
public static FieldWriter create(
final ColumnSelectorFactory columnSelectorFactory,
final String columnName,
- final ColumnType columnType
+ final ColumnType columnType,
+ final boolean allowNullColumnType
)
{
if (columnType == null) {
+ if (allowNullColumnType) {
+ return makeComplexWriter(columnSelectorFactory, columnName,
NestedDataComplexTypeSerde.TYPE_NAME);
Review Comment:
```suggestion
// returning complex writer since we do not know the type of column.
return makeComplexWriter(columnSelectorFactory, columnName,
NestedDataComplexTypeSerde.TYPE_NAME);
```
##########
processing/src/main/java/org/apache/druid/frame/read/FrameReader.java:
##########
@@ -85,12 +86,17 @@ public static FrameReader create(final RowSignature
signature)
final List<FieldReader> fieldReaders = new ArrayList<>(signature.size());
for (int columnNumber = 0; columnNumber < signature.size();
columnNumber++) {
- final ColumnType columnType =
- Preconditions.checkNotNull(
- signature.getColumnType(columnNumber).orElse(null),
- "Type for column [%s]",
- signature.getColumnName(columnNumber)
- );
+ ColumnType columnType;
+ if (!allowNullTypes) {
Review Comment:
Same lets mention the change of semantics when nullTypes are allowed in the
java doc.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +199,75 @@ public RowSignature resultArraySignature(final ScanQuery
query)
}
}
+ @Override
+ public boolean canFetchResultsAsFrames()
+ {
+ return true;
+ }
+
+ @Override
+ public Sequence<FrameSignaturePair> resultsAsFrames(
+ final ScanQuery query,
+ final Sequence<ScanResultValue> resultSequence
+ )
+ {
+ return resultSequence.map(
+ result -> {
+ final List rows = (List) result.getEvents();
+ final Function<?, Object[]> mapper = getResultFormatMapper(query);
+ final Iterable<Object[]> formattedRows = Iterables.transform(rows,
(Function) mapper);
+
+ RowBasedCursor cursor =
IterableRowsCursorHelper.getCursorFromIterable(
+ formattedRows,
+ result.getRowSignature()
+ );
+
+ FrameWriterFactory frameWriterFactory =
FrameWriters.makeFrameWriterFactory(
+ FrameType.ROW_BASED,
+ new
SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+ result.getRowSignature(),
+ new ArrayList<>(),
+ true
+ );
+
+ Frame frame;
+
+ try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+ while (!cursor.isDone()) {
+ if (!frameWriter.addSelection()) {
+ throw new
FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+ }
+
+ cursor.advance();
+ }
+
+ frame = Frame.wrap(frameWriter.toByteArray());
+ }
+
+ return new FrameSignaturePair(frame, result.getRowSignature());
Review Comment:
Since we can have different row sigs per segment, what we can do is only
start a new frame when the row signature is different. This will reduce the
number of frames by a lot.
##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -330,4 +330,29 @@ public Sequence<Object[]> resultsAsArrays(QueryType query,
Sequence<ResultType>
{
throw new UOE("Query type '%s' does not support returning results as
arrays", query.getType());
}
+
+ /**
+ * Converts a sequence of this query's ResultType into a sequence of {@link
FrameSignaturePair}. The array signature
+ * is the one give by {@link #resultArraySignature(Query)}.
+ *
+ * Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the
behaviour of the rows represented by the
+ * frame sequence is identical.
+ *
+ * Each Frame has a separate {@link RowSignature} because for some query
types like the Scan query, every
+ * column in the final result might not be present in the individual
ResultType (and subsequently Frame). Therefore,
+ * this is done to preserve the space by not populating the column in that
particular Frame and omitting it from its
+ * signature
+ */
+ public Sequence<FrameSignaturePair> resultsAsFrames(QueryType query,
Sequence<ResultType> resultSequence)
Review Comment:
Can we return an Optional<Sequence> and remove the `canFetchResultsAsFrames`
call from the interface ?
##########
processing/src/main/java/org/apache/druid/frame/write/FrameWriters.java:
##########
@@ -58,19 +58,30 @@ public static FrameWriterFactory makeFrameWriterFactory(
final FrameType frameType,
final MemoryAllocatorFactory allocatorFactory,
final RowSignature signature,
- final List<KeyColumn> sortColumns
+ final List<KeyColumn> sortColumns,
+ final boolean allowNullColumnTypes
)
{
switch (Preconditions.checkNotNull(frameType, "frameType")) {
case COLUMNAR:
- return new ColumnarFrameWriterFactory(allocatorFactory, signature,
sortColumns);
+ return new ColumnarFrameWriterFactory(allocatorFactory, signature,
sortColumns, allowNullColumnTypes);
case ROW_BASED:
- return new RowBasedFrameWriterFactory(allocatorFactory, signature,
sortColumns);
+ return new RowBasedFrameWriterFactory(allocatorFactory, signature,
sortColumns, allowNullColumnTypes);
default:
throw new ISE("Unrecognized frame type [%s]", frameType);
}
}
+ public static FrameWriterFactory makeFrameWriterFactory(
Review Comment:
This method seems weird. You can call the base method directly.
##########
processing/src/main/java/org/apache/druid/query/FramesBackedInlineDataSourceSerializer.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Serializes {@link FramesBackedInlineDataSource} to the representation of
{@link IterableBackedInlineDataSource}
+ * so that the servers' on wire transfer data doesn't change. {@link
FramesBackedInlineDataSource} is currently limited
+ * to the brokers only and therefore this aids in conversion of the object to
a representation that the data servers
+ * can recognize
+ */
+public class FramesBackedInlineDataSourceSerializer extends
StdSerializer<FramesBackedInlineDataSource>
+{
+ public FramesBackedInlineDataSourceSerializer()
+ {
+ super(FramesBackedInlineDataSource.class);
+ }
+
+ @Override
+ public void serialize(FramesBackedInlineDataSource value, JsonGenerator jg,
SerializerProvider serializers)
+ throws IOException
+ {
+ jg.writeStartObject();
+ jg.writeStringField("type", "inline");
+
+ RowSignature rowSignature = value.getRowSignature();
+ jg.writeObjectField("columnNames", rowSignature.getColumnNames());
+ List<ColumnType> columnTypes = IntStream.range(0, rowSignature.size())
+ .mapToObj(i ->
rowSignature.getColumnType(i).orElse(null))
+ .collect(Collectors.toList());
+ jg.writeObjectField("columnTypes", columnTypes);
+
+ jg.writeArrayFieldStart("rows");
+
+ value.getRowsAsSequence().forEach(row -> {
+ try {
+ JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
+ }
+ catch (IOException e) {
+ // Do nothing, shouldn't be possible for now
Review Comment:
I think we should propagate the exceptions forward.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +199,75 @@ public RowSignature resultArraySignature(final ScanQuery
query)
}
}
+ @Override
+ public boolean canFetchResultsAsFrames()
+ {
+ return true;
+ }
+
+ @Override
+ public Sequence<FrameSignaturePair> resultsAsFrames(
+ final ScanQuery query,
+ final Sequence<ScanResultValue> resultSequence
+ )
+ {
+ return resultSequence.map(
Review Comment:
We should get the memory limit on line 213 and pass initialize a remaining
quota variable.
The remaining quota should be passed to 227 and then what ever the frame
uses should be subtracted from the reaming quota variable in line 246.
This way we will limit the frame generation whenever the memory limit is
hit.
##########
processing/src/main/java/org/apache/druid/query/FramesBackedInlineDataSource.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Represents an inline datasource where the rows are embedded within the
DataSource object itself.
+ *
+ * The rows are backed by a sequence of {@link FrameSignaturePair}, which
contain the Frame representation of the rows
+ * represented by the datasource. {@link #getRowsAsList()} and {@link
#getRowsAsSequence()} return the iterables which
+ * read the rows that are encapsulated in the frames.
+ *
+ * Note that the signature of the datasource can be different from the
signatures of the constituent frames that it
+ * consists of. While fetching the iterables, it is the job of this class to
make sure that the rows correspond to the
+ * {@link #rowSignature}. For frames that donot contain the columns present in
the {@link #rowSignature}, they are
+ * populated with {@code null}.
+ */
+public class FramesBackedInlineDataSource implements DataSource
+{
+
+ final Sequence<FrameSignaturePair> frames;
+ final RowSignature rowSignature;
+
+ public FramesBackedInlineDataSource(
+ Sequence<FrameSignaturePair> frames,
+ RowSignature rowSignature
+ )
+ {
+ this.frames = frames;
+ this.rowSignature = rowSignature;
+ }
+
+ public Sequence<FrameSignaturePair> getFrames()
+ {
+ return frames;
+ }
+
+ public RowSignature getRowSignature()
+ {
+ return rowSignature;
+ }
+
+ public List<Object[]> getRowsAsList()
Review Comment:
Can this be a private method ?
##########
processing/src/main/java/org/apache/druid/query/FramesBackedInlineDataSource.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Represents an inline datasource where the rows are embedded within the
DataSource object itself.
+ *
+ * The rows are backed by a sequence of {@link FrameSignaturePair}, which
contain the Frame representation of the rows
+ * represented by the datasource. {@link #getRowsAsList()} and {@link
#getRowsAsSequence()} return the iterables which
+ * read the rows that are encapsulated in the frames.
+ *
+ * Note that the signature of the datasource can be different from the
signatures of the constituent frames that it
+ * consists of. While fetching the iterables, it is the job of this class to
make sure that the rows correspond to the
+ * {@link #rowSignature}. For frames that donot contain the columns present in
the {@link #rowSignature}, they are
+ * populated with {@code null}.
+ */
+public class FramesBackedInlineDataSource implements DataSource
+{
+
+ final Sequence<FrameSignaturePair> frames;
+ final RowSignature rowSignature;
+
+ public FramesBackedInlineDataSource(
+ Sequence<FrameSignaturePair> frames,
+ RowSignature rowSignature
+ )
+ {
+ this.frames = frames;
+ this.rowSignature = rowSignature;
+ }
+
+ public Sequence<FrameSignaturePair> getFrames()
+ {
+ return frames;
+ }
+
+ public RowSignature getRowSignature()
+ {
+ return rowSignature;
+ }
+
+ public List<Object[]> getRowsAsList()
+ {
+ List<Object[]> frameRows = new ArrayList<>();
+
+ final Sequence<Cursor> cursorSequence = frames
+ .flatMap(
+ frameSignaturePair -> {
Review Comment:
We can extract this to a method since getRowsAsSequence also uses the same
code
##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -163,12 +169,15 @@ public <T> QueryRunner<T>
getQueryRunnerForIntervals(Query<T> query, Iterable<In
final DataSource freeTradeDataSource =
globalizeIfPossible(newQuery.getDataSource());
// do an inlining dry run to see if any inlining is necessary, without
actually running the queries.
final int maxSubqueryRows =
query.context().getMaxSubqueryRows(serverConfig.getMaxSubqueryRows());
+ final long maxSubqueryMemory =
query.context().getMaxSubqueryMemoryBytes(serverConfig.getMaxSubqueryMemory());
Review Comment:
Lets undocument the server property since it will enable all q's running
with the new query path.
Also we should benchmark the new query path.
##########
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java:
##########
@@ -548,6 +561,46 @@ public Sequence<Object[]> resultsAsArrays(TopNQuery query,
Sequence<Result<TopNR
);
}
+ @Override
+ public boolean canFetchResultsAsFrames()
+ {
+ return true;
+ }
+
+ @Override
+ public Sequence<FrameSignaturePair> resultsAsFrames(TopNQuery query,
Sequence<Result<TopNResultValue>> resultSequence)
+ {
+ final RowSignature rowSignature = resultArraySignature(query);
Review Comment:
Also mention the number of frames returned in each query type.
##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +199,75 @@ public RowSignature resultArraySignature(final ScanQuery
query)
}
}
+ @Override
+ public boolean canFetchResultsAsFrames()
+ {
+ return true;
+ }
+
+ @Override
+ public Sequence<FrameSignaturePair> resultsAsFrames(
+ final ScanQuery query,
+ final Sequence<ScanResultValue> resultSequence
+ )
+ {
+ return resultSequence.map(
+ result -> {
+ final List rows = (List) result.getEvents();
+ final Function<?, Object[]> mapper = getResultFormatMapper(query);
+ final Iterable<Object[]> formattedRows = Iterables.transform(rows,
(Function) mapper);
+
+ RowBasedCursor cursor =
IterableRowsCursorHelper.getCursorFromIterable(
+ formattedRows,
+ result.getRowSignature()
+ );
+
+ FrameWriterFactory frameWriterFactory =
FrameWriters.makeFrameWriterFactory(
+ FrameType.ROW_BASED,
+ new
SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+ result.getRowSignature(),
+ new ArrayList<>(),
+ true
+ );
+
+ Frame frame;
+
+ try (final FrameWriter frameWriter =
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+ while (!cursor.isDone()) {
+ if (!frameWriter.addSelection()) {
+ throw new
FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+ }
+
+ cursor.advance();
+ }
+
+ frame = Frame.wrap(frameWriter.toByteArray());
+ }
+
+ return new FrameSignaturePair(frame, result.getRowSignature());
Review Comment:
We are making one frame per result sequence and each result sequence
represents one segment. This does not seem very scalable.
Lets leave a note here so that we can get back to this in a future PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]