This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c6da2f30e8f Add fieldReader for row based frames (#16707)
c6da2f30e8f is described below

commit c6da2f30e8f33c570173b7ac649a829173187481
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Aug 13 14:04:41 2024 +0530

    Add fieldReader for row based frames (#16707)
    
    Add a new fieldReaders#makeRAC for RowBasedFrameRowsAndColumns.
---
 .../druid/frame/field/ComplexFieldReader.java      |  97 +++++-
 .../druid/frame/field/DoubleArrayFieldReader.java  |  10 +
 .../druid/frame/field/DoubleFieldReader.java       | 140 +++++++++
 .../druid/frame/field/FieldPositionHelper.java     |  65 ++++
 .../org/apache/druid/frame/field/FieldReader.java  |  12 +-
 .../druid/frame/field/FloatArrayFieldReader.java   |  10 +
 .../apache/druid/frame/field/FloatFieldReader.java | 140 +++++++++
 .../druid/frame/field/LongArrayFieldReader.java    |  10 +
 .../apache/druid/frame/field/LongFieldReader.java  | 141 ++++++++-
 .../druid/frame/field/NumericArrayFieldReader.java |   2 +-
 .../druid/frame/field/NumericFieldReader.java      |   7 +-
 .../druid/frame/field/StringFieldReader.java       | 347 ++++++++++++++++++---
 .../concrete/RowBasedFrameRowsAndColumns.java      |  17 +-
 .../druid/frame/field/DoubleFieldReaderTest.java   |  30 ++
 .../druid/frame/field/FieldReaderRACTest.java      |  80 +++++
 .../druid/frame/field/FloatFieldReaderTest.java    |  30 ++
 .../druid/frame/field/LongFieldReaderTest.java     |  31 ++
 .../query/rowsandcols/RowsAndColumnsTestBase.java  |   5 +-
 .../ColumnBasedFrameRowsAndColumnsTest.java        |   6 +-
 .../concrete/RowBasedFrameRowsAndColumnsTest.java  |  80 +++++
 20 files changed, 1190 insertions(+), 70 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
index 29bf0945adb..75ad70d5cb4 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
@@ -21,24 +21,32 @@ package org.apache.druid.frame.field;
 
 import com.google.common.base.Preconditions;
 import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import 
org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.ObjectColumnSelector;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
 import org.apache.druid.segment.serde.ComplexMetrics;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.util.Comparator;
 
 /**
  * Reads values written by {@link ComplexFieldWriter}.
- *
+ * <p>
  * Format:
- *
+ * <p>
  * - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link 
ComplexFieldWriter#NOT_NULL_BYTE}
  * - 4 bytes: length of serialized complex value, little-endian int
  * - N bytes: serialized complex value
@@ -121,7 +129,7 @@ public class ComplexFieldReader implements FieldReader
    * Alternative interface to read the field from the memory without creating 
a selector and field pointer
    */
   @Nullable
-  public static Object readFieldFromMemory(
+  public static <T> T readFieldFromMemory(
       final ComplexMetricSerde serde,
       final Memory memory,
       final long position
@@ -136,7 +144,8 @@ public class ComplexFieldReader implements FieldReader
       final byte[] bytes = new byte[length];
       memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0, 
length);
 
-      return serde.fromBytes(bytes, 0, length);
+      //noinspection unchecked
+      return (T) serde.fromBytes(bytes, 0, length);
     } else {
       throw new ISE("Unexpected null byte [%s]", nullByte);
     }
@@ -166,8 +175,8 @@ public class ComplexFieldReader implements FieldReader
     @Override
     public T getObject()
     {
-      //noinspection unchecked
-      return (T) readFieldFromMemory(serde, memory, fieldPointer.position());
+      final long fieldPosition = fieldPointer.position();
+      return readFieldFromMemory(serde, memory, fieldPosition);
     }
 
     @Override
@@ -183,4 +192,80 @@ public class ComplexFieldReader implements FieldReader
       // Do nothing.
     }
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName), 
signature.size());
+  }
+
+  private class ComplexFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final ColumnType type;
+    private final FieldPositionHelper coach;
+
+    public ComplexFieldReaderColumn(Frame frame, int columnIndex, int 
numFields)
+    {
+      this.frame = frame;
+      dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.type = ColumnType.ofComplex(serde.getTypeName());
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
+
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ObjectColumnAccessorBase()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return type;
+        }
+
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          return dataRegion.getByte(fieldPosition) == 
ComplexFieldWriter.NULL_BYTE;
+        }
+
+        @Override
+        protected Object getVal(int rowNum)
+        {
+          return readFieldFromMemory(serde, dataRegion, 
coach.computeFieldPosition(rowNum));
+        }
+
+        @Override
+        protected Comparator<Object> getComparator()
+        {
+          return serde.getTypeStrategy();
+        }
+
+      };
+    }
+
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
 
b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
index ec7de095e12..2e57a3e9a5c 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
@@ -20,7 +20,11 @@
 package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 
@@ -60,4 +64,10 @@ public class DoubleArrayFieldReader extends 
NumericArrayFieldReader
       }
     };
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
index 7f7a3f8639e..3afadebe711 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DoubleColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * Reads the values produced by {@link DoubleFieldWriter}
  *
@@ -99,4 +108,135 @@ public class DoubleFieldReader extends NumericFieldReader
       return super._isNull();
     }
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    return new DoubleFieldReaderColumn(frame, signature.indexOf(columnName), 
signature.size());
+  }
+
+  private class DoubleFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final FieldPositionHelper coach;
+
+    public DoubleFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+    {
+      this.frame = frame;
+      dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
+
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ColumnAccessor()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return ColumnType.DOUBLE;
+        }
+
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+        }
+
+        @Nullable
+        @Override
+        public Object getObject(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return null;
+          } else {
+            return getDoubleAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public double getDouble(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return 0L;
+          } else {
+            return getDoubleAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public float getFloat(int rowNum)
+        {
+          return (float) getDouble(rowNum);
+        }
+
+        @Override
+        public long getLong(int rowNum)
+        {
+          return (long) getDouble(rowNum);
+        }
+
+        @Override
+        public int getInt(int rowNum)
+        {
+          return (int) getDouble(rowNum);
+        }
+
+        @Override
+        public int compareRows(int lhsRowNum, int rhsRowNum)
+        {
+          long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+          long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+          final byte nullIndicatorByte = getNullIndicatorByte();
+          if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 0;
+            } else {
+              return -1;
+            }
+          } else {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 1;
+            } else {
+              return Double.compare(getDoubleAtPosition(lhsPosition), 
getDoubleAtPosition(rhsPosition));
+            }
+          }
+        }
+
+        private double getDoubleAtPosition(long lhsPosition)
+        {
+          return 
TransformUtils.detransformToDouble(dataRegion.getLong(lhsPosition + 
Byte.BYTES));
+        }
+      };
+    }
+
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
 
b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
new file mode 100644
index 00000000000..d4abe5300b4
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+
+/**
+ * Helps compute the field position for a frame from the different regions in 
the frame.
+ */
+public class FieldPositionHelper
+{
+  private final Frame frame;
+  private final Memory offsetRegion;
+  private final Memory dataRegion;
+  private final int columnIndex;
+  private final long fieldsBytesSize;
+
+  public FieldPositionHelper(
+      Frame frame,
+      Memory offsetRegion,
+      Memory dataRegion,
+      int columnIndex,
+      int numFields
+  )
+  {
+    this.frame = frame;
+    this.offsetRegion = offsetRegion;
+    this.dataRegion = dataRegion;
+    this.columnIndex = columnIndex;
+    this.fieldsBytesSize = this.columnIndex == 0
+                           ? ((long) numFields) * Integer.BYTES
+                           : ((long) (this.columnIndex - 1)) * Integer.BYTES;
+  }
+
+  public long computeFieldPosition(int rowNum)
+  {
+    rowNum = frame.physicalRow(rowNum);
+    final long rowPosition = rowNum == 0 ? 0 : offsetRegion.getLong(((long) 
rowNum - 1) * Long.BYTES);
+    final long fieldPosition;
+    if (columnIndex == 0) {
+      fieldPosition = rowPosition + fieldsBytesSize;
+    } else {
+      fieldPosition = rowPosition + dataRegion.getInt(rowPosition + 
fieldsBytesSize);
+    }
+    return fieldPosition;
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
index bc9d631361f..8cf0378071d 100644
--- a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
@@ -20,24 +20,32 @@
 package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.key.RowKey;
 import org.apache.druid.frame.key.RowKeyReader;
 import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 
 /**
  * Embeds the logic to read a specific field from row-based frames or from 
{@link RowKey}.
- *
+ * <p>
  * Most callers should use {@link org.apache.druid.frame.read.FrameReader} or
  * {@link RowKeyReader} rather than using this interface directly.
- *
+ * <p>
  * Stateless and immutable.
  */
 public interface FieldReader
 {
+  /**
+   * Create a {@link Column} which provides accses to the rows in the frame, 
via the {@link Column#toAccessor()}.
+   */
+  Column makeRACColumn(Frame frame, RowSignature signature, String columnName);
+
   /**
    * Create a {@link ColumnValueSelector} backed by some memory and a moveable 
pointer.
    */
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
 
b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
index e97af071824..7252265ba8a 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
@@ -20,7 +20,11 @@
 package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 
@@ -60,4 +64,10 @@ public class FloatArrayFieldReader extends 
NumericArrayFieldReader
       }
     };
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
index 6617d563d67..3fc7213c73e 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.FloatColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * Reads values written by {@link FloatFieldWriter}.
  *
@@ -99,4 +108,135 @@ public class FloatFieldReader extends NumericFieldReader
       return super._isNull();
     }
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    return new FloatFieldReaderColumn(frame, signature.indexOf(columnName), 
signature.size());
+  }
+
+  private class FloatFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final FieldPositionHelper coach;
+
+    public FloatFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+    {
+      this.frame = frame;
+      dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
+
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ColumnAccessor()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return ColumnType.FLOAT;
+        }
+
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+        }
+
+        @Nullable
+        @Override
+        public Object getObject(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return null;
+          } else {
+            return getFloatAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public double getDouble(int rowNum)
+        {
+          return getFloat(rowNum);
+        }
+
+        @Override
+        public float getFloat(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return 0L;
+          } else {
+            return getFloatAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public long getLong(int rowNum)
+        {
+          return (long) getFloat(rowNum);
+        }
+
+        @Override
+        public int getInt(int rowNum)
+        {
+          return (int) getFloat(rowNum);
+        }
+
+        @Override
+        public int compareRows(int lhsRowNum, int rhsRowNum)
+        {
+          long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+          long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+          final byte nullIndicatorByte = getNullIndicatorByte();
+          if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 0;
+            } else {
+              return -1;
+            }
+          } else {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 1;
+            } else {
+              return Float.compare(getFloatAtPosition(lhsPosition), 
getFloatAtPosition(rhsPosition));
+            }
+          }
+        }
+
+        private float getFloatAtPosition(long rhsPosition)
+        {
+          return 
TransformUtils.detransformToFloat(dataRegion.getInt(rhsPosition + Byte.BYTES));
+        }
+      };
+    }
+
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
 
b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
index 8f7578c07d3..ee77223aefe 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
@@ -20,7 +20,11 @@
 package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 
@@ -60,4 +64,10 @@ public class LongArrayFieldReader extends 
NumericArrayFieldReader
       }
     };
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
index 8f3bbbf0451..9b514c93087 100644
--- a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.Memory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.LongColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 /**
  * Reads values written by {@link LongFieldWriter}.
  *
@@ -68,7 +77,6 @@ public class LongFieldReader extends NumericFieldReader
 
   private static class LongFieldSelector extends NumericFieldReader.Selector 
implements LongColumnSelector
   {
-
     final Memory dataRegion;
     final ReadableFieldPointer fieldPointer;
 
@@ -99,4 +107,135 @@ public class LongFieldReader extends NumericFieldReader
       return super._isNull();
     }
   }
+
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    return new LongFieldReaderColumn(frame, signature.indexOf(columnName), 
signature.size());
+  }
+
+  private class LongFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final FieldPositionHelper coach;
+
+    public LongFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+    {
+      this.frame = frame;
+      this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
+
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ColumnAccessor()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return ColumnType.LONG;
+        }
+
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+        }
+
+        @Nullable
+        @Override
+        public Object getObject(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return null;
+          } else {
+            return getLongAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public double getDouble(int rowNum)
+        {
+          return getLong(rowNum);
+        }
+
+        @Override
+        public float getFloat(int rowNum)
+        {
+          return getLong(rowNum);
+        }
+
+        @Override
+        public long getLong(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+          if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+            return 0L;
+          } else {
+            return getLongAtPosition(fieldPosition);
+          }
+        }
+
+        @Override
+        public int getInt(int rowNum)
+        {
+          return (int) getLong(rowNum);
+        }
+
+        @Override
+        public int compareRows(int lhsRowNum, int rhsRowNum)
+        {
+          long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+          long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+          final byte nullIndicatorByte = getNullIndicatorByte();
+          if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 0;
+            } else {
+              return -1;
+            }
+          } else {
+            if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+              return 1;
+            } else {
+              return Long.compare(getLongAtPosition(lhsPosition), 
getLongAtPosition(rhsPosition));
+            }
+          }
+        }
+
+        private long getLongAtPosition(long rhsPosition)
+        {
+          return 
TransformUtils.detransformToLong(dataRegion.getLong(rhsPosition + Byte.BYTES));
+        }
+      };
+    }
+
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
 
b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
index 8d6f3958b1e..92dfbc98596 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
 /**
  * Reader class for the fields written by {@link NumericArrayFieldWriter}. See 
the Javadoc for the writer for more
  * information on the format
- *
+ * <p>
  * The numeric array fields are byte comparable
  */
 public abstract class NumericArrayFieldReader implements FieldReader
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
index 1e11cfa65f3..bb0cc4faed6 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
@@ -50,6 +50,11 @@ public abstract class NumericFieldReader implements 
FieldReader
     }
   }
 
+  public byte getNullIndicatorByte()
+  {
+    return nullIndicatorByte;
+  }
+
   @Override
   public ColumnValueSelector<?> makeColumnValueSelector(Memory memory, 
ReadableFieldPointer fieldPointer)
   {
@@ -94,7 +99,7 @@ public abstract class NumericFieldReader implements 
FieldReader
   /**
    * Helper class which allows the inheritors to fetch the nullity of the 
field located at fieldPointer's position in
    * the dataRegion.
-   *
+   * <p>
    * The implementations of the column value selectors returned by the {@link 
#getColumnValueSelector} can inherit this
    * class and call {@link #_isNull()} in their {@link 
ColumnValueSelector#isNull()} to offload the responsibility of
    * detecting null elements to this Selector, instead of reworking the null 
handling
diff --git 
a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java 
b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
index 2513a2d2444..1c51a914e0d 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
@@ -23,51 +23,64 @@ import com.google.common.primitives.Ints;
 import it.unimi.dsi.fastutil.objects.ObjectArrays;
 import org.apache.datasketches.memory.Memory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.DruidPredicateFactory;
 import org.apache.druid.query.filter.ValueMatcher;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import 
org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.DimensionSelectorUtils;
 import org.apache.druid.segment.IdLookup;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.data.RangeIndexedInts;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 /**
  * Reads fields written by {@link StringFieldWriter} or {@link 
StringArrayFieldWriter}.
- *
+ * <p>
  * Strings are written in UTF8 and terminated by {@link 
StringFieldWriter#VALUE_TERMINATOR}. Note that this byte
  * appears in valid UTF8 encodings if and only if the string contains a NUL 
(char 0). Therefore, this field writer
  * cannot write out strings containing NUL characters.
- *
+ * <p>
  * All rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}.
- *
+ * <p>
  * Empty rows are represented in one byte: solely that {@link 
StringFieldWriter#ROW_TERMINATOR}. Rows that are null
  * themselves (i.e., a null array) are represented as a {@link 
StringFieldWriter#NULL_ROW} followed by a
  * {@link StringFieldWriter#ROW_TERMINATOR}. This encoding for null arrays is 
decoded by older readers as an
  * empty array; null arrays are a feature that did not exist in earlier 
versions of the code.
- *
+ * <p>
  * Null strings are stored as {@link StringFieldWriter#NULL_BYTE}. All other 
strings are prepended by
  * {@link StringFieldWriter#NOT_NULL_BYTE} byte to differentiate them from 
nulls.
- *
+ * <p>
  * This encoding allows the encoded data to be compared as bytes in a way that 
matches the behavior of
  * {@link 
org.apache.druid.segment.StringDimensionHandler#DIMENSION_SELECTOR_COMPARATOR}, 
except null and
  * empty list are not considered equal.
  */
 public class StringFieldReader implements FieldReader
 {
+  public static final byte[] EXPECTED_BYTES_FOR_NULL = {
+      StringFieldWriter.NULL_BYTE, StringFieldWriter.VALUE_TERMINATOR, 
StringFieldWriter.ROW_TERMINATOR
+  };
   private final boolean asArray;
 
   public StringFieldReader()
@@ -123,6 +136,16 @@ public class StringFieldReader implements FieldReader
     }
   }
 
+  @Override
+  public Column makeRACColumn(Frame frame, RowSignature signature, String 
columnName)
+  {
+    if (asArray) {
+      return new StringArrayFieldReaderColumn(frame, 
signature.indexOf(columnName), signature.size());
+    } else {
+      return new StringFieldReaderColumn(frame, signature.indexOf(columnName), 
signature.size());
+    }
+  }
+
   /**
    * Selector that reads a value from a location pointed to by {@link 
ReadableFieldPointer}.
    */
@@ -297,70 +320,296 @@ public class StringFieldReader implements FieldReader
     {
       currentUtf8StringsIsNull = false;
       currentUtf8Strings.clear();
+      currentUtf8StringsIsNull = addStringsToList(memory, fieldPosition, 
currentUtf8Strings);
+    }
+  }
 
-      long position = fieldPosition;
-      long limit = memory.getCapacity();
+  private static class StringFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final FieldPositionHelper coach;
 
-      boolean rowTerminatorSeen = false;
+    public StringFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+    {
+      this.frame = frame;
+      this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
 
-      while (position < limit && !rowTerminatorSeen) {
-        final byte kind = memory.getByte(position);
-        position++;
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ObjectColumnAccessorBase()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return ColumnType.STRING;
+        }
 
-        switch (kind) {
-          case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte 
value)
-            if (position == fieldPosition + 1) {
-              // It was NULL_ROW.
-              currentUtf8StringsIsNull = true;
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          byte[] nullBytes = new byte[3];
+          dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
+          return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
+        }
+
+        @Override
+        public int compareRows(int lhsRowNum, int rhsRowNum)
+        {
+          ByteBuffer lhs = 
getUTF8BytesAtPosition(coach.computeFieldPosition(lhsRowNum));
+          ByteBuffer rhs = 
getUTF8BytesAtPosition(coach.computeFieldPosition(rhsRowNum));
+
+          if (lhs == null) {
+            if (rhs == null) {
+              return 0;
+            } else {
+              return -1;
+            }
+          } else {
+            if (rhs == null) {
+              return 1;
+            } else {
+              return lhs.compareTo(rhs);
             }
+          }
+        }
 
-            // Skip; next byte will be a null/not-null byte or a row 
terminator.
-            break;
+        @Override
+        protected Object getVal(int rowNum)
+        {
+          return getStringAtPosition(coach.computeFieldPosition(rowNum));
+        }
 
-          case StringFieldWriter.ROW_TERMINATOR:
-            // Skip; this is the end of the row, so we'll fall through to the 
return statement.
-            rowTerminatorSeen = true;
-            break;
+        @Override
+        protected Comparator<Object> getComparator()
+        {
+          // we implement compareRows and thus don't need to actually 
implement this method
+          throw new UnsupportedOperationException();
+        }
 
-          case StringFieldWriter.NULL_BYTE:
-            currentUtf8Strings.add(null);
-            break;
+        @Nullable
+        private String getStringAtPosition(long fieldPosition)
+        {
+          return 
StringUtils.fromUtf8Nullable(getUTF8BytesAtPosition(fieldPosition));
+        }
 
-          case StringFieldWriter.NOT_NULL_BYTE:
-            for (long i = position; ; i++) {
-              if (i >= limit) {
-                throw new ISE("Value overrun");
-              }
+        @Nullable
+        private ByteBuffer getUTF8BytesAtPosition(long fieldPosition)
+        {
+          ArrayList<ByteBuffer> buffers = new ArrayList<>();
+          final boolean isNull = addStringsToList(dataRegion, fieldPosition, 
buffers);
+          if (isNull) {
+            return null;
+          } else {
+            if (buffers.size() > 1) {
+              throw DruidException.defensive(
+                  "Can only work with single-valued strings, should use a 
COMPLEX or ARRAY typed Column instead"
+              );
+            }
+            return buffers.get(0);
+          }
+        }
+      };
+    }
 
-              final byte b = memory.getByte(i);
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 
-              if (b == StringFieldWriter.VALUE_TERMINATOR) {
-                final int len = Ints.checkedCast(i - position);
+  private static class StringArrayFieldReaderColumn implements Column
+  {
+    private final Frame frame;
+    private final Memory dataRegion;
+    private final FieldPositionHelper coach;
+
+    public StringArrayFieldReaderColumn(Frame frame, int columnIndex, int 
numFields)
+    {
+      this.frame = frame;
+      this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+      this.coach = new FieldPositionHelper(
+          frame,
+          frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+          this.dataRegion,
+          columnIndex,
+          numFields
+      );
+    }
+
+    @Nonnull
+    @Override
+    public ColumnAccessor toAccessor()
+    {
+      return new ObjectColumnAccessorBase()
+      {
+        @Override
+        public ColumnType getType()
+        {
+          return ColumnType.STRING_ARRAY;
+        }
+
+        @Override
+        public int numRows()
+        {
+          return frame.numRows();
+        }
+
+        @Override
+        public boolean isNull(int rowNum)
+        {
+          final long fieldPosition = coach.computeFieldPosition(rowNum);
+          byte[] nullBytes = new byte[3];
+          dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
+          return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
+        }
+
+        @Override
+        public int compareRows(int lhsRowNum, int rhsRowNum)
+        {
+          throw NotYetImplemented.ex(
+              null,
+              "Should implement this by comparing the actual bytes in the 
frame, they should be comparable"
+          );
+        }
+
+        @Override
+        protected Object getVal(int rowNum)
+        {
+          return getStringsAtPosition(coach.computeFieldPosition(rowNum));
+        }
+
+        @Override
+        protected Comparator<Object> getComparator()
+        {
+          // we implement compareRows and thus don't need to actually 
implement this method
+          throw new UnsupportedOperationException();
+        }
+
+        @Nullable
+        private List<String> getStringsAtPosition(long fieldPosition)
+        {
+          final List<ByteBuffer> bufs = getUTF8BytesAtPosition(fieldPosition);
+          if (bufs == null) {
+            return null;
+          }
+
+          final ArrayList<String> retVal = new ArrayList<>(bufs.size());
+          for (ByteBuffer buf : bufs) {
+            retVal.add(StringUtils.fromUtf8Nullable(buf));
+          }
+          return retVal;
+        }
 
-                if (len == 0 && NullHandling.replaceWithDefault()) {
-                  // Empty strings and nulls are the same in this mode.
-                  currentUtf8Strings.add(null);
-                } else {
-                  final ByteBuffer buf = 
FrameReaderUtils.readByteBuffer(memory, position, len);
-                  currentUtf8Strings.add(buf);
-                }
+        @Nullable
+        private List<ByteBuffer> getUTF8BytesAtPosition(long fieldPosition)
+        {
+          ArrayList<ByteBuffer> buffers = new ArrayList<>();
+          final boolean isNull = addStringsToList(dataRegion, fieldPosition, 
buffers);
+          if (isNull) {
+            return null;
+          } else {
+            return buffers;
+          }
+        }
+      };
+    }
 
-                position += len;
+    @Nullable
+    @Override
+    public <T> T as(Class<? extends T> clazz)
+    {
+      return null;
+    }
+  }
 
-                break;
+  private static boolean addStringsToList(Memory memory, long fieldPosition, 
List<ByteBuffer> list)
+  {
+    long position = fieldPosition;
+    long limit = memory.getCapacity();
+
+    boolean rowTerminatorSeen = false;
+    boolean isEffectivelyNull = false;
+
+    while (position < limit && !rowTerminatorSeen) {
+      final byte kind = memory.getByte(position);
+      position++;
+
+      switch (kind) {
+        case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte 
value)
+          if (position == fieldPosition + 1) {
+            // It was NULL_ROW.
+            isEffectivelyNull = true;
+          }
+
+          // Skip; next byte will be a null/not-null byte or a row terminator.
+          break;
+
+        case StringFieldWriter.ROW_TERMINATOR:
+          // Skip; this is the end of the row, so we'll fall through to the 
return statement.
+          rowTerminatorSeen = true;
+          break;
+
+        case StringFieldWriter.NULL_BYTE:
+          list.add(null);
+          break;
+
+        case StringFieldWriter.NOT_NULL_BYTE:
+          for (long i = position; ; i++) {
+            if (i >= limit) {
+              throw new ISE("Value overrun");
+            }
+
+            final byte b = memory.getByte(i);
+
+            if (b == StringFieldWriter.VALUE_TERMINATOR) {
+              final int len = Ints.checkedCast(i - position);
+
+              if (len == 0 && NullHandling.replaceWithDefault()) {
+                // Empty strings and nulls are the same in this mode.
+                list.add(null);
+              } else {
+                final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory, 
position, len);
+                list.add(buf);
               }
+
+              position += len;
+
+              break;
             }
+          }
 
-            break;
+          break;
 
-          default:
-            throw new ISE("Invalid value start byte [%s]", kind);
-        }
+        default:
+          throw new ISE("Invalid value start byte [%s]", kind);
       }
+    }
 
-      if (!rowTerminatorSeen) {
-        throw new ISE("Unexpected end of field");
-      }
+    if (!rowTerminatorSeen) {
+      throw new ISE("Unexpected end of field");
     }
+    return isEffectivelyNull;
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
index 234410bc070..fa17984e9ba 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
@@ -19,12 +19,13 @@
 
 package org.apache.druid.query.rowsandcols.concrete;
 
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.field.FieldReader;
+import org.apache.druid.frame.field.FieldReaders;
 import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.frame.read.columnar.FrameColumnReaders;
 import org.apache.druid.frame.segment.FrameStorageAdapter;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
@@ -65,6 +66,7 @@ public class RowBasedFrameRowsAndColumns implements 
RowsAndColumns, AutoCloseabl
   @Override
   public Column findColumn(String name)
   {
+    // Use contains so that we can negative cache.
     if (!colCache.containsKey(name)) {
       final int columnIndex = signature.indexOf(name);
       if (columnIndex < 0) {
@@ -72,9 +74,16 @@ public class RowBasedFrameRowsAndColumns implements 
RowsAndColumns, AutoCloseabl
       } else {
         final ColumnType columnType = signature
             .getColumnType(columnIndex)
-            .orElseThrow(() -> new ISE("just got the id, why is columnType not 
there?"));
+            .orElseThrow(
+                () -> DruidException.defensive(
+                    "just got the id [%s][%s], why is columnType not there?",
+                    columnIndex,
+                    name
+                )
+            );
 
-        colCache.put(name, FrameColumnReaders.create(name, columnIndex, 
columnType).readRACColumn(frame));
+        final FieldReader reader = FieldReaders.create(name, columnType);
+        colCache.put(name, reader.makeRACColumn(frame, signature, name));
       }
     }
     return colCache.get(name);
diff --git 
a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
 
b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
index 00fafe53f62..175e9de2a56 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.apache.druid.query.filter.DruidObjectPredicate;
 import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import 
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
 
+import java.util.List;
+import java.util.Objects;
+
 public class DoubleFieldReaderTest extends InitializedNullHandlingTest
 {
   private static final long MEMORY_POSITION = 1;
@@ -143,6 +152,27 @@ public class DoubleFieldReaderTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testCompareRows()
+  {
+    final List<Double> rows = 
FrameWriterTestData.TEST_DOUBLES.getData(KeyOrder.ASCENDING);
+
+    final ColumnAccessor accessor =
+        RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+            MapOfColumnsRowsAndColumns.builder()
+                                      .add("dim1", rows.toArray(), 
ColumnType.DOUBLE)
+                                      .build()
+        ).findColumn("dim1").toAccessor();
+
+    for (int i = 1; i < rows.size(); i++) {
+      if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+        Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+      } else {
+        Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+      }
+    }
+  }
+
   @Test
   public void test_makeDimensionSelector_aValue()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java 
b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java
new file mode 100644
index 00000000000..70ff7037b8c
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.SimpleAscendingOffset;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class FieldReaderRACTest extends InitializedNullHandlingTest
+{
+  final DruidExceptionMatcher noArraysMatcher = DruidExceptionMatcher
+      .defensive()
+      .expectMessageIs("Can only work with single-valued strings, should use a 
COMPLEX or ARRAY typed Column instead");
+
+  @Test
+  public void testDataSet() throws IOException
+  {
+    final QueryableIndex index = TestIndex.getMMappedTestIndex();
+    final QueryableIndexStorageAdapter storageAdapter = new 
QueryableIndexStorageAdapter(index);
+    final Frame frame = FrameTestUtil.adapterToFrame(storageAdapter, 
FrameType.ROW_BASED);
+
+    final RowSignature siggy = storageAdapter.getRowSignature();
+    final RowBasedFrameRowsAndColumns rowBasedRAC = new 
RowBasedFrameRowsAndColumns(frame, siggy);
+
+    for (String columnName : siggy.getColumnNames()) {
+      final ColumnHolder colHolder = index.getColumnHolder(columnName);
+      final boolean multiValue = 
colHolder.getCapabilities().hasMultipleValues().isTrue();
+
+      try (BaseColumn col = colHolder.getColumn()) {
+        final ColumnAccessor racCol = 
rowBasedRAC.findColumn(columnName).toAccessor();
+
+        final SimpleAscendingOffset offset = new 
SimpleAscendingOffset(racCol.numRows());
+        final ColumnValueSelector<?> selector = 
col.makeColumnValueSelector(offset);
+        while (offset.withinBounds()) {
+          if (multiValue) {
+            noArraysMatcher.assertThrowsAndMatches(() -> 
racCol.getObject(offset.getOffset()));
+          } else {
+            final Object racObj = racCol.getObject(offset.getOffset());
+            Assert.assertEquals(racCol.isNull(offset.getOffset()), 
racCol.getObject(offset.getOffset()) == null);
+            Assert.assertEquals(selector.getObject(), racObj);
+          }
+          offset.increment();
+        }
+      }
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
 
b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
index 4b8d10ca0e0..3669c89232d 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.apache.druid.query.filter.DruidObjectPredicate;
 import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import 
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
 
+import java.util.List;
+import java.util.Objects;
+
 public class FloatFieldReaderTest extends InitializedNullHandlingTest
 {
   private static final long MEMORY_POSITION = 1;
@@ -75,6 +84,27 @@ public class FloatFieldReaderTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(NullHandling.sqlCompatible(), 
FloatFieldReader.forPrimitive().isNull(memory, MEMORY_POSITION));
   }
 
+  @Test
+  public void testCompareRows()
+  {
+    final List<Float> rows = 
FrameWriterTestData.TEST_FLOATS.getData(KeyOrder.ASCENDING);
+
+    final ColumnAccessor accessor =
+        RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+            MapOfColumnsRowsAndColumns.builder()
+                                      .add("dim1", rows.toArray(), 
ColumnType.FLOAT)
+                                      .build()
+        ).findColumn("dim1").toAccessor();
+
+    for (int i = 1; i < rows.size(); i++) {
+      if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+        Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+      } else {
+        Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+      }
+    }
+  }
+
   @Test
   public void test_isNull_aValue()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
 
b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
index aab55654e5a..266d86e1283 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
 
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.apache.druid.query.filter.DruidObjectPredicate;
 import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import 
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionDictionarySelector;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 import org.mockito.quality.Strictness;
 
+import java.util.List;
+import java.util.Objects;
+
 public class LongFieldReaderTest extends InitializedNullHandlingTest
 {
   private static final long MEMORY_POSITION = 1;
@@ -202,6 +211,28 @@ public class LongFieldReaderTest extends 
InitializedNullHandlingTest
     
Assert.assertFalse(readSelector.makeValueMatcher(StringPredicateDruidPredicateFactory.equalTo("2")).matches(false));
   }
 
+
+  @Test
+  public void testCompareRows()
+  {
+    final List<Long> rows = 
FrameWriterTestData.TEST_LONGS.getData(KeyOrder.ASCENDING);
+
+    final ColumnAccessor accessor =
+        RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+            MapOfColumnsRowsAndColumns.builder()
+                                      .add("dim1", rows.toArray(), 
ColumnType.LONG)
+                                      .build()
+        ).findColumn("dim1").toAccessor();
+
+    for (int i = 1; i < rows.size(); i++) {
+      if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+        Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+      } else {
+        Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+      }
+    }
+  }
+
   private void writeToMemory(final Long value)
   {
     Mockito.when(writeSelector.isNull()).thenReturn(value == null);
diff --git 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
index 56be3d50f20..7b639b3d48d 100644
--- 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
+++ 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
@@ -24,6 +24,8 @@ import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.ISE;
 import 
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
 import 
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import 
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -67,7 +69,8 @@ public abstract class RowsAndColumnsTestBase
         new Object[]{ConcatRowsAndColumns.class, 
ConcatRowsAndColumnsTest.MAKER},
         new Object[]{RearrangedRowsAndColumns.class, 
RearrangedRowsAndColumnsTest.MAKER},
         new Object[]{ColumnBasedFrameRowsAndColumns.class, 
ColumnBasedFrameRowsAndColumnsTest.MAKER},
-        new Object[]{StorageAdapterRowsAndColumns.class, 
StorageAdapterRowsAndColumnsTest.MAKER}
+        new Object[]{StorageAdapterRowsAndColumns.class, 
StorageAdapterRowsAndColumnsTest.MAKER},
+        new Object[]{RowBasedFrameRowsAndColumns.class, 
RowBasedFrameRowsAndColumnsTest.MAKER}
     );
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
index cd1bb1b81ec..acfcbe6f83e 100644
--- 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
@@ -33,17 +33,13 @@ public class ColumnBasedFrameRowsAndColumnsTest extends 
RowsAndColumnsTestBase
     super(ColumnBasedFrameRowsAndColumns.class);
   }
 
-  public static Function<MapOfColumnsRowsAndColumns, 
ColumnBasedFrameRowsAndColumns> MAKER = input -> {
-
-    return buildFrame(input);
-  };
+  public static Function<MapOfColumnsRowsAndColumns, 
ColumnBasedFrameRowsAndColumns> MAKER = 
ColumnBasedFrameRowsAndColumnsTest::buildFrame;
 
   public static ColumnBasedFrameRowsAndColumns 
buildFrame(MapOfColumnsRowsAndColumns input)
   {
     LazilyDecoratedRowsAndColumns rac = new 
LazilyDecoratedRowsAndColumns(input, null, null, null, 
OffsetLimit.limit(Integer.MAX_VALUE), null, null);
 
     rac.numRows(); // materialize
-
     return (ColumnBasedFrameRowsAndColumns) rac.getBase();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
new file mode 100644
index 00000000000..867e83d9e00
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class RowBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
+{
+  public RowBasedFrameRowsAndColumnsTest()
+  {
+    super(RowBasedFrameRowsAndColumns.class);
+  }
+
+  public static Function<MapOfColumnsRowsAndColumns, 
RowBasedFrameRowsAndColumns> MAKER = 
RowBasedFrameRowsAndColumnsTest::buildFrame;
+
+  private static RowBasedFrameRowsAndColumns 
buildFrame(MapOfColumnsRowsAndColumns rac)
+  {
+    final AtomicInteger rowId = new AtomicInteger(0);
+    final int numRows = rac.numRows();
+    final ColumnSelectorFactoryMaker csfm = 
ColumnSelectorFactoryMaker.fromRAC(rac);
+    final ColumnSelectorFactory selectorFactory = csfm.make(rowId);
+
+    final RowSignature.Builder sigBob = RowSignature.builder();
+    final ArenaMemoryAllocatorFactory memFactory = new 
ArenaMemoryAllocatorFactory(200 << 20);
+
+
+    for (String column : rac.getColumnNames()) {
+      final Column racColumn = rac.findColumn(column);
+      if (racColumn == null) {
+        continue;
+      }
+      sigBob.add(column, racColumn.toAccessor().getType());
+    }
+
+    final RowSignature signature = sigBob.build();
+    final FrameWriter frameWriter = 
FrameWriters.makeRowBasedFrameWriterFactory(
+        memFactory,
+        signature,
+        Collections.emptyList(),
+        false
+    ).newFrameWriter(selectorFactory);
+
+    rowId.set(0);
+    for (; rowId.get() < numRows; rowId.incrementAndGet()) {
+      frameWriter.addSelection();
+    }
+
+    return new 
RowBasedFrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), signature);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to