paul-rogers commented on code in PR #12745:
URL: https://github.com/apache/druid/pull/12745#discussion_r915074068


##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame

Review Comment:
   Frame is concrete. It appears to assume heap memory. One could imagine 
wanting a direct memory version for better memory control. Should this class be 
an interface (or abstract) with, at present, one concrete implementation?



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:

Review Comment:
   Did we consider using Protobuf or Thrift for the header portion? With a 
home-grown format, we will add code over time to handle version compatibility. 
Protobuf does that for us already.



##########
processing/src/main/java/org/apache/druid/frame/field/DoubleFieldWriter.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.segment.BaseDoubleColumnValueSelector;
+
+/**
+ * Wraps a {@link BaseDoubleColumnValueSelector} and writes field values.
+ *
+ * Values are transformed such that they are comparable as bytes; see {@link 
#transform} and {@link #detransform}.
+ * Values are preceded by a null byte that is either 0x00 (null) or 0x01 (not 
null). This ensures that nulls sort
+ * earlier than nonnulls.
+ */
+public class DoubleFieldWriter implements FieldWriter
+{
+  public static final int SIZE = Double.BYTES + Byte.BYTES;
+  public static final byte NULL_BYTE = 0x00;
+  public static final byte NOT_NULL_BYTE = 0x01;
+
+  private final BaseDoubleColumnValueSelector selector;
+
+  public DoubleFieldWriter(final BaseDoubleColumnValueSelector selector)
+  {
+    this.selector = selector;
+  }
+
+  @Override
+  public long writeTo(final WritableMemory memory, final long position, final 
long maxSize)

Review Comment:
   Example of the above comment. Here, the caller has to keep track of the 
offset for each field, and update that offset using the returned length. That's 
quite a bit of accounting if we have, say, 100 fields!
   
   This class appears to be static: it accepts the memory, the position and a 
value (from a selector?) but yet another thing keeps track of offsets.
   
   Might be cleaner if this object is created per field and tracks offsets 
internally. If we assume sequential writes (that is, row-by-row), then the 
write is only moving forward: works for fixed-width and variable-width values. 
If we allow random writes (not sure how we'd do that), can we explain the logic?
   
   
   Did we consider using a row offset so we can do the math for fixed fields as 
above? For variable-width fields, is there



##########
processing/src/main/java/org/apache/druid/frame/field/RowMemoryFieldPointer.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.segment.row.ReadableFrameRowPointer;
+
+/**
+ * A {@link ReadableFieldPointer} that is derived from a row-based frame.
+ */
+public class RowMemoryFieldPointer implements ReadableFieldPointer
+{
+  private final Memory memory;
+  private final ReadableFrameRowPointer rowPointer;
+  private final int fieldNumber;
+  private final int fieldCount;
+
+  public RowMemoryFieldPointer(
+      final Memory memory,
+      final ReadableFrameRowPointer rowPointer,
+      final int fieldNumber,
+      final int fieldCount
+  )
+  {
+    this.memory = memory;
+    this.rowPointer = rowPointer;
+    this.fieldNumber = fieldNumber;
+    this.fieldCount = fieldCount;
+  }
+
+  @Override
+  public long position()
+  {
+    if (fieldNumber == 0) {
+      return rowPointer.position() + (long) Integer.BYTES * fieldCount;
+    } else {
+      return rowPointer.position() + memory.getInt(rowPointer.position() + 
(long) Integer.BYTES * (fieldNumber - 1));
+    }

Review Comment:
   A comment to explain this logic would be helpful. A quick read suggests that 
field 0 is some how special. For any other field, we get an indirection from 
our memory. For field 0, we seem to get a position at the end of...what? Maybe 
a small memory layout diagram would help?



##########
processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import com.google.common.base.Predicate;
+import com.google.common.primitives.Ints;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.RangeIndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Reads fields written by {@link StringFieldWriter} or {@link 
StringArrayFieldWriter}.
+ */
+public class StringFieldReader implements FieldReader
+{
+  private final boolean asArray;
+
+  StringFieldReader(final boolean asArray)
+  {
+    this.asArray = asArray;
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, 
ReadableFieldPointer fieldPointer)
+  {
+    return new Selector(memory, fieldPointer, null, asArray);
+  }
+
+  @Override
+  public DimensionSelector makeDimensionSelector(
+      Memory memory,
+      ReadableFieldPointer fieldPointer,
+      @Nullable ExtractionFn extractionFn
+  )
+  {
+    if (asArray) {
+      throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", 
ColumnType.STRING_ARRAY);
+    }
+
+    return new Selector(memory, fieldPointer, extractionFn, false);
+  }
+
+  @Override
+  public boolean isComparable()
+  {
+    return true;
+  }
+
+  private static class Selector implements DimensionSelector
+  {
+    private final Memory memory;
+    private final ReadableFieldPointer fieldPointer;
+    @Nullable
+    private final ExtractionFn extractionFn;
+    private final boolean asArray;
+
+    private long currentFieldPosition = -1;
+    private final RangeIndexedInts indexedInts = new RangeIndexedInts();
+    private final List<ByteBuffer> currentUtf8Strings = new ArrayList<>();
+
+    private Selector(
+        final Memory memory,
+        final ReadableFieldPointer fieldPointer,
+        @Nullable final ExtractionFn extractionFn,
+        final boolean asArray
+    )
+    {
+      this.memory = memory;
+      this.fieldPointer = fieldPointer;
+      this.extractionFn = extractionFn;
+      this.asArray = asArray;
+    }
+
+    @Nullable
+    @Override
+    public Object getObject()
+    {
+      final List<ByteBuffer> currentStrings = computeCurrentUtf8Strings();
+      final int size = currentStrings.size();
+
+      if (size == 0) {
+        return asArray ? Collections.emptyList() : null;
+      } else if (size == 1) {
+        return asArray ? Collections.singletonList(lookupName(0)) : 
lookupName(0);
+      } else {
+        final List<String> strings = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+          strings.add(lookupName(i));
+        }
+        return strings;
+      }
+    }
+
+    @Override
+    public IndexedInts getRow()
+    {
+      indexedInts.setSize(computeCurrentUtf8Strings().size());
+      return indexedInts;
+    }
+
+    @Nullable
+    @Override
+    public String lookupName(int id)
+    {
+      final ByteBuffer byteBuffer = computeCurrentUtf8Strings().get(id);
+      final String s = byteBuffer != null ? 
StringUtils.fromUtf8(byteBuffer.duplicate()) : null;
+      return extractionFn == null ? s : extractionFn.apply(s);
+    }
+
+    @Override
+    public boolean supportsLookupNameUtf8()
+    {
+      return extractionFn == null;
+    }
+
+    @Nullable
+    @Override
+    public ByteBuffer lookupNameUtf8(int id)
+    {
+      if (extractionFn != null) {
+        throw new ISE("Cannot use lookupNameUtf8 on this selector");
+      }
+
+      return computeCurrentUtf8Strings().get(id);
+    }
+
+    @Override
+    public int getValueCardinality()
+    {
+      return CARDINALITY_UNKNOWN;
+    }

Review Comment:
   Some of these attributes seem related more to the *type* of the data than 
the actual *data*. Should there be a reader which can deliver strings, and can 
provide an object with the type info? That is, separate the representation into 
data and metadata?



##########
processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DoubleColumnSelector;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.column.ValueTypes;
+
+import javax.annotation.Nullable;
+
+/**
+ * Reads values written by {@link DoubleFieldWriter}.
+ */
+public class DoubleFieldReader implements FieldReader
+{
+  DoubleFieldReader()
+  {
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, 
ReadableFieldPointer fieldPointer)
+  {
+    return new Selector(memory, fieldPointer);
+  }
+
+  @Override
+  public DimensionSelector makeDimensionSelector(
+      Memory memory,
+      ReadableFieldPointer fieldPointer,
+      @Nullable ExtractionFn extractionFn
+  )
+  {
+    return ValueTypes.makeNumericWrappingDimensionSelector(
+        ValueType.DOUBLE,
+        makeColumnValueSelector(memory, fieldPointer),
+        extractionFn
+    );
+  }
+
+  @Override
+  public boolean isComparable()
+  {
+    return true;
+  }
+
+  private static class Selector implements DoubleColumnSelector
+  {
+    private final Memory dataRegion;
+    private final ReadableFieldPointer fieldPointer;

Review Comment:
   A few more comments would be helpful here.
   
   At the memory level, a field pointer might point to the byte offset of the 
data. We see this in `getDouble()` below. That then requires something to 
advance the pointer 4 + 1 bytes per value.
   
   It can be handy to have a "row" context pointer. If we know we're reading 
row 5, we do the math: 5 * (4 + 1) = 25 is the byte offset.
   
   The row form moves the field width calculations into the reader: we don't 
need another layer that knows how much to advance the value.



##########
processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import com.google.common.base.Predicate;
+import com.google.common.primitives.Ints;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.RangeIndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Reads fields written by {@link StringFieldWriter} or {@link 
StringArrayFieldWriter}.
+ */
+public class StringFieldReader implements FieldReader
+{
+  private final boolean asArray;
+
+  StringFieldReader(final boolean asArray)
+  {
+    this.asArray = asArray;
+  }
+
+  @Override
+  public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, 
ReadableFieldPointer fieldPointer)
+  {
+    return new Selector(memory, fieldPointer, null, asArray);
+  }
+
+  @Override
+  public DimensionSelector makeDimensionSelector(
+      Memory memory,
+      ReadableFieldPointer fieldPointer,
+      @Nullable ExtractionFn extractionFn
+  )
+  {
+    if (asArray) {
+      throw new ISE("Cannot call makeDimensionSelector on field of type [%s]", 
ColumnType.STRING_ARRAY);
+    }
+
+    return new Selector(memory, fieldPointer, extractionFn, false);
+  }
+
+  @Override
+  public boolean isComparable()
+  {
+    return true;
+  }
+
+  private static class Selector implements DimensionSelector

Review Comment:
   Perhaps a note about the relationship between a reader and a selector? A 
naive reader (me) would expect them to be the same thing. I have a string 
reader and an index. As I advance the index, I can read strings from string 
columns. A brief not on the model used here would be helpful.



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame

Review Comment:
   Related: what is the memory management policy for a frame? When on heap, GC 
will handle releasing memory. If in direct memory, how is ownership handled?



##########
processing/src/main/java/org/apache/druid/frame/Frame.java:
##########
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A data frame.
+ *
+ * Frames are split into contiguous "regions". With columnar frames ({@link 
FrameType#COLUMNAR}) each region
+ * is a column. With row-based frames ({@link FrameType#ROW_BASED}) there are 
always two regions: row offsets
+ * and row data.
+ *
+ * This object is lightweight. It has constant overhead regardless of the 
number of rows or regions.
+ *
+ * Frames are written with {@link org.apache.druid.frame.write.FrameWriter} 
and read with
+ * {@link org.apache.druid.frame.read.FrameReader}.
+ *
+ * Frame format:
+ *
+ * - 1 byte: {@link FrameType#version()}
+ * - 8 bytes: size in bytes of the frame, little-endian long
+ * - 4 bytes: number of rows, little-endian int
+ * - 4 bytes: number of regions, little-endian int
+ * - 1 byte: 0 if frame is nonpermuted, 1 if frame is permuted
+ * - 4 bytes x numRows: permutation section; only present for permuted frames. 
Array of little-endian ints mapping
+ * logical row numbers to physical row numbers.
+ * - 8 bytes x numRegions: region offsets. Array of end offsets of each region 
(exclusive), relative to start of frame,
+ * as little-endian longs.
+ * - NNN bytes: regions, back-to-back.
+ *
+ * There is also a compressed frame format. Compressed frames are written by 
{@link #writeTo} when "compress" is
+ * true, and decompressed by {@link #decompress}. Format:
+ *
+ * - 8 bytes: compressed frame length, little-endian long
+ * - 8 bytes: uncompressed frame length (numBytes), little-endian long
+ * - NNN bytes: LZ4-compressed frame
+ * - 8 bytes: 64-bit xxhash checksum of prior content, including 16-byte 
header and compressed frame, little-endian long
+ */
+public class Frame
+{
+  public static final long HEADER_SIZE =
+      Byte.BYTES /* version */ +
+      Long.BYTES /* total size */ +
+      Integer.BYTES /* number of rows */ +
+      Integer.BYTES /* number of columns */ +
+      Byte.BYTES /* permuted flag */;
+
+  private static final LZ4Compressor LZ4_COMPRESSOR = 
LZ4Factory.fastestInstance().fastCompressor();
+  private static final LZ4SafeDecompressor LZ4_DECOMPRESSOR = 
LZ4Factory.fastestInstance().safeDecompressor();
+  private static final int COMPRESSED_FRAME_HEADER_SIZE = Long.BYTES * 2; // 
Compressed, uncompressed lengths
+  private static final int COMPRESSED_FRAME_TRAILER_SIZE = Long.BYTES; // 
Checksum
+  private static final int COMPRESSED_FRAME_ENVELOPE_SIZE = 
COMPRESSED_FRAME_HEADER_SIZE
+                                                            + 
COMPRESSED_FRAME_TRAILER_SIZE;
+  private static final int CHECKSUM_SEED = 0;
+
+  private final Memory memory;
+  private final FrameType frameType;
+  private final long numBytes;
+  private final int numRows;
+  private final int numRegions;
+  private final boolean permuted;
+
+  private Frame(Memory memory, FrameType frameType, long numBytes, int 
numRows, int numRegions, boolean permuted)
+  {
+    this.memory = memory;
+    this.frameType = frameType;
+    this.numBytes = numBytes;
+    this.numRows = numRows;
+    this.numRegions = numRegions;
+    this.permuted = permuted;
+  }
+
+  /**
+   * Returns a frame backed by the provided Memory. This operation does not do 
any copies or allocations.
+   *
+   * The Memory must be in little-endian byte order.
+   *
+   * Behavior is undefined if the memory is modified anytime during the 
lifetime of the Frame object.
+   */
+  public static Frame wrap(final Memory memory)
+  {
+    if (memory.getTypeByteOrder() != ByteOrder.LITTLE_ENDIAN) {
+      throw new IAE("Memory must be little-endian");
+    }
+
+    if (memory.getCapacity() < HEADER_SIZE) {
+      throw new IAE("Memory too short for a header");
+    }
+
+    final byte version = memory.getByte(0);
+    final FrameType frameType = FrameType.forVersion(version);
+
+    if (frameType == null) {
+      throw new IAE("Unexpected byte [%s] at start of frame", version);
+    }
+
+    final long numBytes = memory.getLong(Byte.BYTES);
+    final int numRows = memory.getInt(Byte.BYTES + Long.BYTES);
+    final int numRegions = memory.getInt(Byte.BYTES + Long.BYTES + 
Integer.BYTES);
+    final boolean permuted = memory.getByte(Byte.BYTES + Long.BYTES + 
Integer.BYTES + Integer.BYTES) != 0;
+
+    if (numBytes != memory.getCapacity()) {
+      throw new IAE("Declared size [%,d] does not match actual size [%,d]", 
numBytes, memory.getCapacity());
+    }
+
+    // Size of permuted row indices.
+    final long rowOrderSize = (permuted ? (long) numRows * Integer.BYTES : 0);
+
+    // Size of region ending positions.
+    final long regionEndSize = (long) numRegions * Long.BYTES;
+
+    final long expectedSizeForPreamble = HEADER_SIZE + rowOrderSize + 
regionEndSize;
+
+    if (numBytes < expectedSizeForPreamble) {
+      throw new IAE("Memory too short for preamble");
+    }
+
+    // Verify each region is wholly contained within this buffer.
+    long regionStart = expectedSizeForPreamble; // First region starts 
immediately after preamble.
+    long regionEnd;
+
+    for (int regionNumber = 0; regionNumber < numRegions; regionNumber++) {
+      regionEnd = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
regionNumber * Long.BYTES);
+
+      if (regionEnd < regionStart || regionEnd > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: end [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionEnd,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+
+      if (regionNumber == 0) {
+        regionStart = expectedSizeForPreamble;
+      } else {
+        regionStart = memory.getLong(HEADER_SIZE + rowOrderSize + (long) 
(regionNumber - 1) * Long.BYTES);
+      }
+
+      if (regionStart < expectedSizeForPreamble || regionStart > numBytes) {
+        throw new ISE(
+            "Region [%d] invalid: start [%,d] out of range [%,d -> %,d]",
+            regionNumber,
+            regionStart,
+            expectedSizeForPreamble,
+            numBytes
+        );
+      }
+    }
+
+    return new Frame(memory, frameType, numBytes, numRows, numRegions, 
permuted);
+  }
+
+  /**
+   * Returns a frame backed by the provided ByteBuffer. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final ByteBuffer buffer)
+  {
+    return wrap(Memory.wrap(buffer, ByteOrder.LITTLE_ENDIAN));
+  }
+
+  /**
+   * Returns a frame backed by the provided byte array. This operation does 
not do any copies or allocations.
+   *
+   * The position and limit of the buffer are ignored. If you need them to be 
respected, call
+   * {@link ByteBuffer#slice()} first, or use {@link #wrap(Memory)} to wrap a 
particular region.
+   */
+  public static Frame wrap(final byte[] bytes)
+  {
+    // Wrap using ByteBuffer, not Memory. Even though it's seemingly 
unnecessary, because ByteBuffers are re-wrapped
+    // with Memory anyway, this is beneficial because it enables zero-copy 
optimizations (search for "hasByteBuffer").
+    return wrap(ByteBuffer.wrap(bytes));
+  }
+
+  /**
+   * Decompresses the provided memory and returns a frame backed by that 
decompressed memory. This operation is
+   * safe even on corrupt data: it validates position, length, and checksum 
prior to decompressing.
+   *
+   * This operation allocates memory on-heap to store the decompressed frame.
+   */
+  public static Frame decompress(final Memory memory, final long position, 
final long length)

Review Comment:
   This static method means that there is only one way to decompress. It 
requires that the decompressor itself be static. Should the codec as a separate 
class that contains the compressor, decompressor, and methods to do the work?



##########
processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import com.google.common.base.Predicate;
+import com.google.common.primitives.Ints;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.RangeIndexedInts;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Reads fields written by {@link StringFieldWriter} or {@link 
StringArrayFieldWriter}.
+ */

Review Comment:
   Perhaps a bit of discussion about the data layout? Is the underlying string 
a string? Or, an index into a dictionary, as in segments? For a string, what is 
the layout? Size + data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to