cryptoe commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1168626818


##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -67,4 +74,45 @@ public static Filter buildFilter(@Nullable Filter filter, 
Interval interval)
       );
     }
   }
+
+  /**
+   * Writes a {@link Cursor} to a {@link Frame}. This method iterates over the 
rows of the cursor, and writes the columns
+   * to the cursor
+   * @param cursor Cursor to write to the frame
+   * @param frameWriterFactory Frame writer factory to write to the frame.
+   *                           Determines the signature of the rows that are 
written to the frames
+   * @param memoryLimitBytes Limit in bytes, if needs to be enforced while 
converting the cursor to the frame. If adding
+   *                         a row causes the frame size to exceed this limit, 
we throw an {@link ResourceLimitExceededException}
+   */
+  public static Frame cursorToFrame(
+      Cursor cursor,
+      FrameWriterFactory frameWriterFactory,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    Frame frame;
+
+    try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+      while (!cursor.isDone()) {
+        if (!frameWriter.addSelection()) {
+          throw new 
FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());

Review Comment:
   This doesn't seem correct. Maybe you are adding too many rows to the frame 
and thats why frame runs out of space ? Maybe throw a new exception type?



##########
processing/src/main/java/org/apache/druid/frame/read/FrameReader.java:
##########
@@ -85,12 +88,17 @@ public static FrameReader create(final RowSignature 
signature)
     final List<FieldReader> fieldReaders = new ArrayList<>(signature.size());
 
     for (int columnNumber = 0; columnNumber < signature.size(); 
columnNumber++) {
-      final ColumnType columnType =
-          Preconditions.checkNotNull(
-              signature.getColumnType(columnNumber).orElse(null),
-              "Type for column [%s]",
-              signature.getColumnName(columnNumber)
-          );
+      ColumnType columnType;
+      if (!allowNullTypes) {
+        columnType =
+            Preconditions.checkNotNull(
+                signature.getColumnType(columnNumber).orElse(null),
+                "Type for column [%s]",
+                signature.getColumnName(columnNumber)
+            );
+      } else {
+        columnType = 
signature.getColumnType(columnNumber).orElse(ColumnType.NESTED_DATA);

Review Comment:
   Can you please mention why this is nested data 



##########
processing/src/main/java/org/apache/druid/query/FramesBackedInlineDataSource.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Represents an inline datasource where the rows are embedded within the 
DataSource object itself.
+ * <p>
+ * The rows are backed by a sequence of {@link FrameSignaturePair}, which 
contain the Frame representation of the rows
+ * represented by the datasource. {@link #getRowsAsList()} and {@link 
#getRowsAsSequence()} return the iterables which
+ * read the rows that are encapsulated in the frames.
+ * <p>
+ * Note that the signature of the datasource can be different from the 
signatures of the constituent frames that it
+ * consists of. While fetching the iterables, it is the job of this class to 
make sure that the rows correspond to the
+ * {@link #rowSignature}. For frames that donot contain the columns present in 
the {@link #rowSignature}, they are
+ * populated with {@code null}.
+ */
+public class FramesBackedInlineDataSource implements DataSource
+{
+
+  final List<FrameSignaturePair> frames;
+  final RowSignature rowSignature;
+
+  public FramesBackedInlineDataSource(
+      List<FrameSignaturePair> frames,
+      RowSignature rowSignature
+  )
+  {
+    this.frames = frames;
+    this.rowSignature = rowSignature;
+  }
+
+  public List<FrameSignaturePair> getFrames()
+  {
+    return frames;
+  }
+
+  public RowSignature getRowSignature()
+  {
+    return rowSignature;
+  }
+
+  private List<Object[]> getRowsAsList()
+  {
+    List<Object[]> frameRows = new ArrayList<>();
+    Yielder<Object[]> rowsYielder = Yielders.each(getRowsAsSequence());
+    while (!rowsYielder.isDone()) {
+      Object[] row = rowsYielder.get();
+      frameRows.add(row);
+      rowsYielder = rowsYielder.next(null);
+    }
+    return frameRows;
+  }
+
+  public Sequence<Object[]> getRowsAsSequence()
+  {
+
+    final Sequence<Cursor> cursorSequence =
+        Sequences.simple(frames)
+                 .flatMap(
+                     frameSignaturePair -> {
+                       Frame frame = frameSignaturePair.getFrame();
+                       RowSignature frameSignature = 
frameSignaturePair.getRowSignature();
+                       FrameReader frameReader = 
FrameReader.create(frameSignature, true);
+                       return new FrameStorageAdapter(frame, frameReader, 
Intervals.ETERNITY)
+                           .makeCursors(null, Intervals.ETERNITY, 
VirtualColumns.EMPTY, Granularities.ALL, false, null);
+                     }
+                 );
+
+    return cursorSequence.flatMap(
+        (cursor) -> {
+          final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
+          final List<BaseObjectColumnValueSelector> selectors = rowSignature
+              .getColumnNames()
+              .stream()
+              .map(columnSelectorFactory::makeColumnValueSelector)
+              .collect(Collectors.toList());
+
+          return Sequences.simple(
+              () -> new Iterator<Object[]>()
+              {
+                @Override
+                public boolean hasNext()
+                {
+                  return !cursor.isDone();
+                }
+
+                @Override
+                public Object[] next()
+                {
+
+                  Object[] row = new Object[rowSignature.size()];
+                  for (int i = 0; i < rowSignature.size(); ++i) {
+                    row[i] = selectors.get(i).getObject();
+                  }
+
+                  cursor.advance();
+
+                  return row;
+                }
+              }
+          );
+        }
+    );
+  }
+
+  @Override
+  public Set<String> getTableNames()
+  {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public List<DataSource> getChildren()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public DataSource withChildren(List<DataSource> children)
+  {
+    if (!children.isEmpty()) {
+      throw new IAE("Cannot accept children");
+    }
+
+    return this;
+  }
+
+  @Override
+  public boolean isCacheable(boolean isBroker)
+  {
+    return false;
+  }
+
+  @Override
+  public boolean isGlobal()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean isConcrete()
+  {
+    return true;
+  }
+
+  @Override
+  public Function<SegmentReference, SegmentReference> 
createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
+  {
+    return Function.identity();
+  }
+
+  @Override
+  public DataSource withUpdatedDataSource(DataSource newSource)
+  {
+    return newSource;
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return null;
+  }
+
+  @Override
+  public DataSourceAnalysis getAnalysis()
+  {
+    return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+  }
+
+  public InlineDataSource toIterableBackedInlineDataSource()

Review Comment:
   Please mention the usages of this method and the limitations like 
materialization of the entire results so that callers are aware when to make 
this expensive call.  



##########
processing/src/main/java/org/apache/druid/query/scan/ConcatCursor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+/**
+ * Combines multiple cursors and iterates over them. It skips over the empty 
cursors
+ */
+public class ConcatCursor implements Cursor
+{
+
+  private final List<Cursor> cursors;
+  private int currentCursor;
+
+  public ConcatCursor(
+      List<Cursor> cursors
+  )
+  {
+    this.cursors = cursors;
+    currentCursor = 0;
+    skipEmptyCursors();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return cursors.get(currentCursor).getColumnSelectorFactory();
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return cursors.get(currentCursor).getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    if (currentCursor < cursors.size()) {
+      cursors.get(currentCursor).advance();
+      if (cursors.get(currentCursor).isDone()) {
+        ++currentCursor;

Review Comment:
   I think this is a common method. Maybe extract it out to 
`moveCursorIfDonetoNextNonEmpty()`



##########
processing/src/test/java/org/apache/druid/query/scan/ConcatCursorTest.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.ListCursor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ConcatCursorTest
+{
+  @Test
+  public void testCursor()
+  {
+    Cursor dummyCursor1 = new ListCursor(new ArrayList<>());
+    Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b"));
+    Cursor dummyCursor2 = new ListCursor(new ArrayList<>());
+    Cursor cursor2 = new ListCursor(ImmutableList.of("c", "d"));
+    Cursor dummyCursor3 = new ListCursor(new ArrayList<>());
+
+    Cursor concatCursor = new ConcatCursor(ImmutableList.of(
+        dummyCursor1,
+        cursor1,
+        dummyCursor2,
+        cursor2,
+        dummyCursor3
+    ));
+
+    List<Object> tempList = new ArrayList<>();
+    // Initial iteration
+    while (!concatCursor.isDone()) {
+      
tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject());

Review Comment:
   Also assert on the iteration count. 



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +203,112 @@ public RowSignature resultArraySignature(final ScanQuery 
query)
     }
   }
 
+  /**
+   * This batches the fetched {@link ScanResultValue}s which have similar 
signatures and are consecutives. In best case
+   * it would return a single frame, and in the worst case, it would return as 
many frames as the number of results
+   * passed. Note: Batching requires all the frames to be materialized before 
they are propogated further and this might
+   * be improved
+   */
+  @Override
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
+      final ScanQuery query,
+      final Sequence<ScanResultValue> resultSequence,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    final AtomicLong memoryLimitAccumulator = memoryLimitBytes != null ? new 
AtomicLong(memoryLimitBytes) : null;
+    Yielder<ScanResultValue> yielder = Yielders.each(resultSequence);
+    RowSignature prevSignature = null;
+    List<Cursor> unwrittenCursors = null;
+    List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();

Review Comment:
   All of this is prematerialized. We need to make this lazy. For next() of the 
iterable, we always return one frame. That frame is generated lazily. 



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +203,112 @@ public RowSignature resultArraySignature(final ScanQuery 
query)
     }
   }
 
+  /**
+   * This batches the fetched {@link ScanResultValue}s which have similar 
signatures and are consecutives. In best case
+   * it would return a single frame, and in the worst case, it would return as 
many frames as the number of results

Review Comment:
   ```suggestion
      * it would return a single frame, and in the worst case, it would return 
as many frames as the number of {#link ScanResultValue}
   ```



##########
processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java:
##########
@@ -187,11 +203,112 @@ public RowSignature resultArraySignature(final ScanQuery 
query)
     }
   }
 
+  /**
+   * This batches the fetched {@link ScanResultValue}s which have similar 
signatures and are consecutives. In best case
+   * it would return a single frame, and in the worst case, it would return as 
many frames as the number of results
+   * passed. Note: Batching requires all the frames to be materialized before 
they are propogated further and this might
+   * be improved
+   */
+  @Override
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
+      final ScanQuery query,
+      final Sequence<ScanResultValue> resultSequence,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    final AtomicLong memoryLimitAccumulator = memoryLimitBytes != null ? new 
AtomicLong(memoryLimitBytes) : null;
+    Yielder<ScanResultValue> yielder = Yielders.each(resultSequence);
+    RowSignature prevSignature = null;
+    List<Cursor> unwrittenCursors = null;
+    List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();

Review Comment:
   Can we add final here



##########
processing/src/test/java/org/apache/druid/query/scan/ConcatCursorTest.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.scan;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.ListCursor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ConcatCursorTest
+{
+  @Test
+  public void testCursor()
+  {
+    Cursor dummyCursor1 = new ListCursor(new ArrayList<>());

Review Comment:
   Test case for
   * empty cursors
   * empty at end only 
   * empty at start
   * 2 empty cursors one after the other in all 3 places start , end , middle. 
   
   Same set of test cases for advanceUninterruptibly. 
   
   Maybe use parameterized tests. 



##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -67,4 +74,45 @@ public static Filter buildFilter(@Nullable Filter filter, 
Interval interval)
       );
     }
   }
+
+  /**
+   * Writes a {@link Cursor} to a {@link Frame}. This method iterates over the 
rows of the cursor, and writes the columns
+   * to the cursor
+   * @param cursor Cursor to write to the frame
+   * @param frameWriterFactory Frame writer factory to write to the frame.
+   *                           Determines the signature of the rows that are 
written to the frames
+   * @param memoryLimitBytes Limit in bytes, if needs to be enforced while 
converting the cursor to the frame. If adding
+   *                         a row causes the frame size to exceed this limit, 
we throw an {@link ResourceLimitExceededException}
+   */
+  public static Frame cursorToFrame(
+      Cursor cursor,
+      FrameWriterFactory frameWriterFactory,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    Frame frame;
+
+    try (final FrameWriter frameWriter = 
frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+      while (!cursor.isDone()) {
+        if (!frameWriter.addSelection()) {
+          throw new 
FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+        }
+
+        if (memoryLimitBytes != null && memoryLimitBytes < 
frameWriter.getTotalSize()) {
+          throw new ResourceLimitExceededException(
+              StringUtils.format(
+                  "The row limit exceeded the bytes allocated for the 
conversion (allocated = [%d] bytes)",

Review Comment:
   Exceeded total bytes allocated for this subquery. Current size [%d], total 
row count [%d] allocated size [%d]. Try the preferred method of limiting the 
results of the subquery to xxx bytes or increase the limit via context param 
xxx.



##########
processing/src/main/java/org/apache/druid/query/FramesBackedInlineDataSourceSerializer.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Serializes {@link FramesBackedInlineDataSource} to the representation of 
{@link InlineDataSource}
+ * so that the servers' on wire transfer data doesn't change. {@link 
FramesBackedInlineDataSource} is currently limited
+ * to the brokers only and therefore this aids in conversion of the object to 
a representation that the data servers
+ * can recognize
+ */
+public class FramesBackedInlineDataSourceSerializer extends 
StdSerializer<FramesBackedInlineDataSource>
+{
+  public FramesBackedInlineDataSourceSerializer()
+  {
+    super(FramesBackedInlineDataSource.class);
+  }
+
+  @Override
+  public void serialize(FramesBackedInlineDataSource value, JsonGenerator jg, 
SerializerProvider serializers)
+      throws IOException
+  {
+    jg.writeStartObject();
+    jg.writeStringField("type", "inline");
+
+    RowSignature rowSignature = value.getRowSignature();
+    jg.writeObjectField("columnNames", rowSignature.getColumnNames());
+    List<ColumnType> columnTypes = IntStream.range(0, rowSignature.size())
+                                            .mapToObj(i -> 
rowSignature.getColumnType(i).orElse(null))
+                                            .collect(Collectors.toList());
+    jg.writeObjectField("columnTypes", columnTypes);
+
+    jg.writeArrayFieldStart("rows");
+
+    value.getRowsAsSequence().forEach(row -> {
+      try {
+        JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
+      }
+      catch (IOException e) {
+        // Ideally, this shouldn't be reachable.
+        // Wrap the IO exception in the runtime exception and propogate it 
forward
+        throw new RuntimeException(e);

Review Comment:
   Please add a exception message which says ```exception encountered while 
serializing `FramesBackedInlineDataSource` ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to