This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c6da2f30e8f Add fieldReader for row based frames (#16707)
c6da2f30e8f is described below
commit c6da2f30e8f33c570173b7ac649a829173187481
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Aug 13 14:04:41 2024 +0530
Add fieldReader for row based frames (#16707)
Add a new fieldReaders#makeRAC for RowBasedFrameRowsAndColumns.
---
.../druid/frame/field/ComplexFieldReader.java | 97 +++++-
.../druid/frame/field/DoubleArrayFieldReader.java | 10 +
.../druid/frame/field/DoubleFieldReader.java | 140 +++++++++
.../druid/frame/field/FieldPositionHelper.java | 65 ++++
.../org/apache/druid/frame/field/FieldReader.java | 12 +-
.../druid/frame/field/FloatArrayFieldReader.java | 10 +
.../apache/druid/frame/field/FloatFieldReader.java | 140 +++++++++
.../druid/frame/field/LongArrayFieldReader.java | 10 +
.../apache/druid/frame/field/LongFieldReader.java | 141 ++++++++-
.../druid/frame/field/NumericArrayFieldReader.java | 2 +-
.../druid/frame/field/NumericFieldReader.java | 7 +-
.../druid/frame/field/StringFieldReader.java | 347 ++++++++++++++++++---
.../concrete/RowBasedFrameRowsAndColumns.java | 17 +-
.../druid/frame/field/DoubleFieldReaderTest.java | 30 ++
.../druid/frame/field/FieldReaderRACTest.java | 80 +++++
.../druid/frame/field/FloatFieldReaderTest.java | 30 ++
.../druid/frame/field/LongFieldReaderTest.java | 31 ++
.../query/rowsandcols/RowsAndColumnsTestBase.java | 5 +-
.../ColumnBasedFrameRowsAndColumnsTest.java | 6 +-
.../concrete/RowBasedFrameRowsAndColumnsTest.java | 80 +++++
20 files changed, 1190 insertions(+), 70 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
index 29bf0945adb..75ad70d5cb4 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/ComplexFieldReader.java
@@ -21,24 +21,32 @@ package org.apache.druid.frame.field;
import com.google.common.base.Preconditions;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import
org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.serde.ComplexMetrics;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.util.Comparator;
/**
* Reads values written by {@link ComplexFieldWriter}.
- *
+ * <p>
* Format:
- *
+ * <p>
* - 1 byte: {@link ComplexFieldWriter#NULL_BYTE} or {@link
ComplexFieldWriter#NOT_NULL_BYTE}
* - 4 bytes: length of serialized complex value, little-endian int
* - N bytes: serialized complex value
@@ -121,7 +129,7 @@ public class ComplexFieldReader implements FieldReader
* Alternative interface to read the field from the memory without creating
a selector and field pointer
*/
@Nullable
- public static Object readFieldFromMemory(
+ public static <T> T readFieldFromMemory(
final ComplexMetricSerde serde,
final Memory memory,
final long position
@@ -136,7 +144,8 @@ public class ComplexFieldReader implements FieldReader
final byte[] bytes = new byte[length];
memory.getByteArray(position + ComplexFieldWriter.HEADER_SIZE, bytes, 0,
length);
- return serde.fromBytes(bytes, 0, length);
+ //noinspection unchecked
+ return (T) serde.fromBytes(bytes, 0, length);
} else {
throw new ISE("Unexpected null byte [%s]", nullByte);
}
@@ -166,8 +175,8 @@ public class ComplexFieldReader implements FieldReader
@Override
public T getObject()
{
- //noinspection unchecked
- return (T) readFieldFromMemory(serde, memory, fieldPointer.position());
+ final long fieldPosition = fieldPointer.position();
+ return readFieldFromMemory(serde, memory, fieldPosition);
}
@Override
@@ -183,4 +192,80 @@ public class ComplexFieldReader implements FieldReader
// Do nothing.
}
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ return new ComplexFieldReaderColumn(frame, signature.indexOf(columnName),
signature.size());
+ }
+
+ private class ComplexFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final ColumnType type;
+ private final FieldPositionHelper coach;
+
+ public ComplexFieldReaderColumn(Frame frame, int columnIndex, int
numFields)
+ {
+ this.frame = frame;
+ dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.type = ColumnType.ofComplex(serde.getTypeName());
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ObjectColumnAccessorBase()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return type;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ return dataRegion.getByte(fieldPosition) ==
ComplexFieldWriter.NULL_BYTE;
+ }
+
+ @Override
+ protected Object getVal(int rowNum)
+ {
+ return readFieldFromMemory(serde, dataRegion,
coach.computeFieldPosition(rowNum));
+ }
+
+ @Override
+ protected Comparator<Object> getComparator()
+ {
+ return serde.getTypeStrategy();
+ }
+
+ };
+ }
+
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
index ec7de095e12..2e57a3e9a5c 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/DoubleArrayFieldReader.java
@@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@@ -60,4 +64,10 @@ public class DoubleArrayFieldReader extends
NumericArrayFieldReader
}
};
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
index 7f7a3f8639e..3afadebe711 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/DoubleFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
* Reads the values produced by {@link DoubleFieldWriter}
*
@@ -99,4 +108,135 @@ public class DoubleFieldReader extends NumericFieldReader
return super._isNull();
}
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ return new DoubleFieldReaderColumn(frame, signature.indexOf(columnName),
signature.size());
+ }
+
+ private class DoubleFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final FieldPositionHelper coach;
+
+ public DoubleFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+ {
+ this.frame = frame;
+ dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ColumnAccessor()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.DOUBLE;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+ }
+
+ @Nullable
+ @Override
+ public Object getObject(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return null;
+ } else {
+ return getDoubleAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return 0L;
+ } else {
+ return getDoubleAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return (float) getDouble(rowNum);
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return (long) getDouble(rowNum);
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return (int) getDouble(rowNum);
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+ long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+ final byte nullIndicatorByte = getNullIndicatorByte();
+ if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 1;
+ } else {
+ return Double.compare(getDoubleAtPosition(lhsPosition),
getDoubleAtPosition(rhsPosition));
+ }
+ }
+ }
+
+ private double getDoubleAtPosition(long lhsPosition)
+ {
+ return
TransformUtils.detransformToDouble(dataRegion.getLong(lhsPosition +
Byte.BYTES));
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
new file mode 100644
index 00000000000..d4abe5300b4
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/frame/field/FieldPositionHelper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+
+/**
+ * Helps compute the field position for a frame from the different regions in
the frame.
+ */
+public class FieldPositionHelper
+{
+ private final Frame frame;
+ private final Memory offsetRegion;
+ private final Memory dataRegion;
+ private final int columnIndex;
+ private final long fieldsBytesSize;
+
+ public FieldPositionHelper(
+ Frame frame,
+ Memory offsetRegion,
+ Memory dataRegion,
+ int columnIndex,
+ int numFields
+ )
+ {
+ this.frame = frame;
+ this.offsetRegion = offsetRegion;
+ this.dataRegion = dataRegion;
+ this.columnIndex = columnIndex;
+ this.fieldsBytesSize = this.columnIndex == 0
+ ? ((long) numFields) * Integer.BYTES
+ : ((long) (this.columnIndex - 1)) * Integer.BYTES;
+ }
+
+ public long computeFieldPosition(int rowNum)
+ {
+ rowNum = frame.physicalRow(rowNum);
+ final long rowPosition = rowNum == 0 ? 0 : offsetRegion.getLong(((long)
rowNum - 1) * Long.BYTES);
+ final long fieldPosition;
+ if (columnIndex == 0) {
+ fieldPosition = rowPosition + fieldsBytesSize;
+ } else {
+ fieldPosition = rowPosition + dataRegion.getInt(rowPosition +
fieldsBytesSize);
+ }
+ return fieldPosition;
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
index bc9d631361f..8cf0378071d 100644
--- a/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/field/FieldReader.java
@@ -20,24 +20,32 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
/**
* Embeds the logic to read a specific field from row-based frames or from
{@link RowKey}.
- *
+ * <p>
* Most callers should use {@link org.apache.druid.frame.read.FrameReader} or
* {@link RowKeyReader} rather than using this interface directly.
- *
+ * <p>
* Stateless and immutable.
*/
public interface FieldReader
{
+ /**
+ * Create a {@link Column} which provides accses to the rows in the frame,
via the {@link Column#toAccessor()}.
+ */
+ Column makeRACColumn(Frame frame, RowSignature signature, String columnName);
+
/**
* Create a {@link ColumnValueSelector} backed by some memory and a moveable
pointer.
*/
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
index e97af071824..7252265ba8a 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/FloatArrayFieldReader.java
@@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@@ -60,4 +64,10 @@ public class FloatArrayFieldReader extends
NumericArrayFieldReader
}
};
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
index 6617d563d67..3fc7213c73e 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/FloatFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.FloatColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
* Reads values written by {@link FloatFieldWriter}.
*
@@ -99,4 +108,135 @@ public class FloatFieldReader extends NumericFieldReader
return super._isNull();
}
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ return new FloatFieldReaderColumn(frame, signature.indexOf(columnName),
signature.size());
+ }
+
+ private class FloatFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final FieldPositionHelper coach;
+
+ public FloatFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+ {
+ this.frame = frame;
+ dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ColumnAccessor()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.FLOAT;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+ }
+
+ @Nullable
+ @Override
+ public Object getObject(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return null;
+ } else {
+ return getFloatAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return getFloat(rowNum);
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return 0L;
+ } else {
+ return getFloatAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ return (long) getFloat(rowNum);
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return (int) getFloat(rowNum);
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+ long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+ final byte nullIndicatorByte = getNullIndicatorByte();
+ if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 1;
+ } else {
+ return Float.compare(getFloatAtPosition(lhsPosition),
getFloatAtPosition(rhsPosition));
+ }
+ }
+ }
+
+ private float getFloatAtPosition(long rhsPosition)
+ {
+ return
TransformUtils.detransformToFloat(dataRegion.getInt(rhsPosition + Byte.BYTES));
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
index 8f7578c07d3..ee77223aefe 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/LongArrayFieldReader.java
@@ -20,7 +20,11 @@
package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@@ -60,4 +64,10 @@ public class LongArrayFieldReader extends
NumericArrayFieldReader
}
};
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ throw NotYetImplemented.ex(null, "Class cannot create an RAC column.");
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
index 8f3bbbf0451..9b514c93087 100644
--- a/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
+++ b/processing/src/main/java/org/apache/druid/frame/field/LongFieldReader.java
@@ -21,11 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.LongColumnSelector;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
* Reads values written by {@link LongFieldWriter}.
*
@@ -68,7 +77,6 @@ public class LongFieldReader extends NumericFieldReader
private static class LongFieldSelector extends NumericFieldReader.Selector
implements LongColumnSelector
{
-
final Memory dataRegion;
final ReadableFieldPointer fieldPointer;
@@ -99,4 +107,135 @@ public class LongFieldReader extends NumericFieldReader
return super._isNull();
}
}
+
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ return new LongFieldReaderColumn(frame, signature.indexOf(columnName),
signature.size());
+ }
+
+ private class LongFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final FieldPositionHelper coach;
+
+ public LongFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+ {
+ this.frame = frame;
+ this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ColumnAccessor()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.LONG;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ return dataRegion.getByte(fieldPosition) == getNullIndicatorByte();
+ }
+
+ @Nullable
+ @Override
+ public Object getObject(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return null;
+ } else {
+ return getLongAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public double getDouble(int rowNum)
+ {
+ return getLong(rowNum);
+ }
+
+ @Override
+ public float getFloat(int rowNum)
+ {
+ return getLong(rowNum);
+ }
+
+ @Override
+ public long getLong(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+
+ if (dataRegion.getByte(fieldPosition) == getNullIndicatorByte()) {
+ return 0L;
+ } else {
+ return getLongAtPosition(fieldPosition);
+ }
+ }
+
+ @Override
+ public int getInt(int rowNum)
+ {
+ return (int) getLong(rowNum);
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ long lhsPosition = coach.computeFieldPosition(lhsRowNum);
+ long rhsPosition = coach.computeFieldPosition(rhsRowNum);
+
+ final byte nullIndicatorByte = getNullIndicatorByte();
+ if (dataRegion.getByte(lhsPosition) == nullIndicatorByte) {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else {
+ if (dataRegion.getByte(rhsPosition) == nullIndicatorByte) {
+ return 1;
+ } else {
+ return Long.compare(getLongAtPosition(lhsPosition),
getLongAtPosition(rhsPosition));
+ }
+ }
+ }
+
+ private long getLongAtPosition(long rhsPosition)
+ {
+ return
TransformUtils.detransformToLong(dataRegion.getLong(rhsPosition + Byte.BYTES));
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
index 8d6f3958b1e..92dfbc98596 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/NumericArrayFieldReader.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
/**
* Reader class for the fields written by {@link NumericArrayFieldWriter}. See
the Javadoc for the writer for more
* information on the format
- *
+ * <p>
* The numeric array fields are byte comparable
*/
public abstract class NumericArrayFieldReader implements FieldReader
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
index 1e11cfa65f3..bb0cc4faed6 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/NumericFieldReader.java
@@ -50,6 +50,11 @@ public abstract class NumericFieldReader implements
FieldReader
}
}
+ public byte getNullIndicatorByte()
+ {
+ return nullIndicatorByte;
+ }
+
@Override
public ColumnValueSelector<?> makeColumnValueSelector(Memory memory,
ReadableFieldPointer fieldPointer)
{
@@ -94,7 +99,7 @@ public abstract class NumericFieldReader implements
FieldReader
/**
* Helper class which allows the inheritors to fetch the nullity of the
field located at fieldPointer's position in
* the dataRegion.
- *
+ * <p>
* The implementations of the column value selectors returned by the {@link
#getColumnValueSelector} can inherit this
* class and call {@link #_isNull()} in their {@link
ColumnValueSelector#isNull()} to offload the responsibility of
* detecting null elements to this Selector, instead of reworking the null
handling
diff --git
a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
index 2513a2d2444..1c51a914e0d 100644
---
a/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
+++
b/processing/src/main/java/org/apache/druid/frame/field/StringFieldReader.java
@@ -23,51 +23,64 @@ import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.ObjectArrays;
import org.apache.datasketches.memory.Memory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReaderUtils;
+import org.apache.druid.frame.write.RowBasedFrameWriter;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import
org.apache.druid.query.rowsandcols.column.accessor.ObjectColumnAccessorBase;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.DimensionSelectorUtils;
import org.apache.druid.segment.IdLookup;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.RangeIndexedInts;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
/**
* Reads fields written by {@link StringFieldWriter} or {@link
StringArrayFieldWriter}.
- *
+ * <p>
* Strings are written in UTF8 and terminated by {@link
StringFieldWriter#VALUE_TERMINATOR}. Note that this byte
* appears in valid UTF8 encodings if and only if the string contains a NUL
(char 0). Therefore, this field writer
* cannot write out strings containing NUL characters.
- *
+ * <p>
* All rows are terminated by {@link StringFieldWriter#ROW_TERMINATOR}.
- *
+ * <p>
* Empty rows are represented in one byte: solely that {@link
StringFieldWriter#ROW_TERMINATOR}. Rows that are null
* themselves (i.e., a null array) are represented as a {@link
StringFieldWriter#NULL_ROW} followed by a
* {@link StringFieldWriter#ROW_TERMINATOR}. This encoding for null arrays is
decoded by older readers as an
* empty array; null arrays are a feature that did not exist in earlier
versions of the code.
- *
+ * <p>
* Null strings are stored as {@link StringFieldWriter#NULL_BYTE}. All other
strings are prepended by
* {@link StringFieldWriter#NOT_NULL_BYTE} byte to differentiate them from
nulls.
- *
+ * <p>
* This encoding allows the encoded data to be compared as bytes in a way that
matches the behavior of
* {@link
org.apache.druid.segment.StringDimensionHandler#DIMENSION_SELECTOR_COMPARATOR},
except null and
* empty list are not considered equal.
*/
public class StringFieldReader implements FieldReader
{
+ public static final byte[] EXPECTED_BYTES_FOR_NULL = {
+ StringFieldWriter.NULL_BYTE, StringFieldWriter.VALUE_TERMINATOR,
StringFieldWriter.ROW_TERMINATOR
+ };
private final boolean asArray;
public StringFieldReader()
@@ -123,6 +136,16 @@ public class StringFieldReader implements FieldReader
}
}
+ @Override
+ public Column makeRACColumn(Frame frame, RowSignature signature, String
columnName)
+ {
+ if (asArray) {
+ return new StringArrayFieldReaderColumn(frame,
signature.indexOf(columnName), signature.size());
+ } else {
+ return new StringFieldReaderColumn(frame, signature.indexOf(columnName),
signature.size());
+ }
+ }
+
/**
* Selector that reads a value from a location pointed to by {@link
ReadableFieldPointer}.
*/
@@ -297,70 +320,296 @@ public class StringFieldReader implements FieldReader
{
currentUtf8StringsIsNull = false;
currentUtf8Strings.clear();
+ currentUtf8StringsIsNull = addStringsToList(memory, fieldPosition,
currentUtf8Strings);
+ }
+ }
- long position = fieldPosition;
- long limit = memory.getCapacity();
+ private static class StringFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final FieldPositionHelper coach;
- boolean rowTerminatorSeen = false;
+ public StringFieldReaderColumn(Frame frame, int columnIndex, int numFields)
+ {
+ this.frame = frame;
+ this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
- while (position < limit && !rowTerminatorSeen) {
- final byte kind = memory.getByte(position);
- position++;
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ObjectColumnAccessorBase()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.STRING;
+ }
- switch (kind) {
- case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte
value)
- if (position == fieldPosition + 1) {
- // It was NULL_ROW.
- currentUtf8StringsIsNull = true;
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ byte[] nullBytes = new byte[3];
+ dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
+ return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ ByteBuffer lhs =
getUTF8BytesAtPosition(coach.computeFieldPosition(lhsRowNum));
+ ByteBuffer rhs =
getUTF8BytesAtPosition(coach.computeFieldPosition(rhsRowNum));
+
+ if (lhs == null) {
+ if (rhs == null) {
+ return 0;
+ } else {
+ return -1;
+ }
+ } else {
+ if (rhs == null) {
+ return 1;
+ } else {
+ return lhs.compareTo(rhs);
}
+ }
+ }
- // Skip; next byte will be a null/not-null byte or a row
terminator.
- break;
+ @Override
+ protected Object getVal(int rowNum)
+ {
+ return getStringAtPosition(coach.computeFieldPosition(rowNum));
+ }
- case StringFieldWriter.ROW_TERMINATOR:
- // Skip; this is the end of the row, so we'll fall through to the
return statement.
- rowTerminatorSeen = true;
- break;
+ @Override
+ protected Comparator<Object> getComparator()
+ {
+ // we implement compareRows and thus don't need to actually
implement this method
+ throw new UnsupportedOperationException();
+ }
- case StringFieldWriter.NULL_BYTE:
- currentUtf8Strings.add(null);
- break;
+ @Nullable
+ private String getStringAtPosition(long fieldPosition)
+ {
+ return
StringUtils.fromUtf8Nullable(getUTF8BytesAtPosition(fieldPosition));
+ }
- case StringFieldWriter.NOT_NULL_BYTE:
- for (long i = position; ; i++) {
- if (i >= limit) {
- throw new ISE("Value overrun");
- }
+ @Nullable
+ private ByteBuffer getUTF8BytesAtPosition(long fieldPosition)
+ {
+ ArrayList<ByteBuffer> buffers = new ArrayList<>();
+ final boolean isNull = addStringsToList(dataRegion, fieldPosition,
buffers);
+ if (isNull) {
+ return null;
+ } else {
+ if (buffers.size() > 1) {
+ throw DruidException.defensive(
+ "Can only work with single-valued strings, should use a
COMPLEX or ARRAY typed Column instead"
+ );
+ }
+ return buffers.get(0);
+ }
+ }
+ };
+ }
- final byte b = memory.getByte(i);
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
- if (b == StringFieldWriter.VALUE_TERMINATOR) {
- final int len = Ints.checkedCast(i - position);
+ private static class StringArrayFieldReaderColumn implements Column
+ {
+ private final Frame frame;
+ private final Memory dataRegion;
+ private final FieldPositionHelper coach;
+
+ public StringArrayFieldReaderColumn(Frame frame, int columnIndex, int
numFields)
+ {
+ this.frame = frame;
+ this.dataRegion = frame.region(RowBasedFrameWriter.ROW_DATA_REGION);
+
+ this.coach = new FieldPositionHelper(
+ frame,
+ frame.region(RowBasedFrameWriter.ROW_OFFSET_REGION),
+ this.dataRegion,
+ columnIndex,
+ numFields
+ );
+ }
+
+ @Nonnull
+ @Override
+ public ColumnAccessor toAccessor()
+ {
+ return new ObjectColumnAccessorBase()
+ {
+ @Override
+ public ColumnType getType()
+ {
+ return ColumnType.STRING_ARRAY;
+ }
+
+ @Override
+ public int numRows()
+ {
+ return frame.numRows();
+ }
+
+ @Override
+ public boolean isNull(int rowNum)
+ {
+ final long fieldPosition = coach.computeFieldPosition(rowNum);
+ byte[] nullBytes = new byte[3];
+ dataRegion.getByteArray(fieldPosition, nullBytes, 0, 3);
+ return Arrays.equals(nullBytes, EXPECTED_BYTES_FOR_NULL);
+ }
+
+ @Override
+ public int compareRows(int lhsRowNum, int rhsRowNum)
+ {
+ throw NotYetImplemented.ex(
+ null,
+ "Should implement this by comparing the actual bytes in the
frame, they should be comparable"
+ );
+ }
+
+ @Override
+ protected Object getVal(int rowNum)
+ {
+ return getStringsAtPosition(coach.computeFieldPosition(rowNum));
+ }
+
+ @Override
+ protected Comparator<Object> getComparator()
+ {
+ // we implement compareRows and thus don't need to actually
implement this method
+ throw new UnsupportedOperationException();
+ }
+
+ @Nullable
+ private List<String> getStringsAtPosition(long fieldPosition)
+ {
+ final List<ByteBuffer> bufs = getUTF8BytesAtPosition(fieldPosition);
+ if (bufs == null) {
+ return null;
+ }
+
+ final ArrayList<String> retVal = new ArrayList<>(bufs.size());
+ for (ByteBuffer buf : bufs) {
+ retVal.add(StringUtils.fromUtf8Nullable(buf));
+ }
+ return retVal;
+ }
- if (len == 0 && NullHandling.replaceWithDefault()) {
- // Empty strings and nulls are the same in this mode.
- currentUtf8Strings.add(null);
- } else {
- final ByteBuffer buf =
FrameReaderUtils.readByteBuffer(memory, position, len);
- currentUtf8Strings.add(buf);
- }
+ @Nullable
+ private List<ByteBuffer> getUTF8BytesAtPosition(long fieldPosition)
+ {
+ ArrayList<ByteBuffer> buffers = new ArrayList<>();
+ final boolean isNull = addStringsToList(dataRegion, fieldPosition,
buffers);
+ if (isNull) {
+ return null;
+ } else {
+ return buffers;
+ }
+ }
+ };
+ }
- position += len;
+ @Nullable
+ @Override
+ public <T> T as(Class<? extends T> clazz)
+ {
+ return null;
+ }
+ }
- break;
+ private static boolean addStringsToList(Memory memory, long fieldPosition,
List<ByteBuffer> list)
+ {
+ long position = fieldPosition;
+ long limit = memory.getCapacity();
+
+ boolean rowTerminatorSeen = false;
+ boolean isEffectivelyNull = false;
+
+ while (position < limit && !rowTerminatorSeen) {
+ final byte kind = memory.getByte(position);
+ position++;
+
+ switch (kind) {
+ case StringFieldWriter.VALUE_TERMINATOR: // Or NULL_ROW (same byte
value)
+ if (position == fieldPosition + 1) {
+ // It was NULL_ROW.
+ isEffectivelyNull = true;
+ }
+
+ // Skip; next byte will be a null/not-null byte or a row terminator.
+ break;
+
+ case StringFieldWriter.ROW_TERMINATOR:
+ // Skip; this is the end of the row, so we'll fall through to the
return statement.
+ rowTerminatorSeen = true;
+ break;
+
+ case StringFieldWriter.NULL_BYTE:
+ list.add(null);
+ break;
+
+ case StringFieldWriter.NOT_NULL_BYTE:
+ for (long i = position; ; i++) {
+ if (i >= limit) {
+ throw new ISE("Value overrun");
+ }
+
+ final byte b = memory.getByte(i);
+
+ if (b == StringFieldWriter.VALUE_TERMINATOR) {
+ final int len = Ints.checkedCast(i - position);
+
+ if (len == 0 && NullHandling.replaceWithDefault()) {
+ // Empty strings and nulls are the same in this mode.
+ list.add(null);
+ } else {
+ final ByteBuffer buf = FrameReaderUtils.readByteBuffer(memory,
position, len);
+ list.add(buf);
}
+
+ position += len;
+
+ break;
}
+ }
- break;
+ break;
- default:
- throw new ISE("Invalid value start byte [%s]", kind);
- }
+ default:
+ throw new ISE("Invalid value start byte [%s]", kind);
}
+ }
- if (!rowTerminatorSeen) {
- throw new ISE("Unexpected end of field");
- }
+ if (!rowTerminatorSeen) {
+ throw new ISE("Unexpected end of field");
}
+ return isEffectivelyNull;
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
index 234410bc070..fa17984e9ba 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java
@@ -19,12 +19,13 @@
package org.apache.druid.query.rowsandcols.concrete;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.field.FieldReader;
+import org.apache.druid.frame.field.FieldReaders;
import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.FrameStorageAdapter;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
@@ -65,6 +66,7 @@ public class RowBasedFrameRowsAndColumns implements
RowsAndColumns, AutoCloseabl
@Override
public Column findColumn(String name)
{
+ // Use contains so that we can negative cache.
if (!colCache.containsKey(name)) {
final int columnIndex = signature.indexOf(name);
if (columnIndex < 0) {
@@ -72,9 +74,16 @@ public class RowBasedFrameRowsAndColumns implements
RowsAndColumns, AutoCloseabl
} else {
final ColumnType columnType = signature
.getColumnType(columnIndex)
- .orElseThrow(() -> new ISE("just got the id, why is columnType not
there?"));
+ .orElseThrow(
+ () -> DruidException.defensive(
+ "just got the id [%s][%s], why is columnType not there?",
+ columnIndex,
+ name
+ )
+ );
- colCache.put(name, FrameColumnReaders.create(name, columnIndex,
columnType).readRACColumn(frame));
+ final FieldReader reader = FieldReaders.create(name, columnType);
+ colCache.put(name, reader.makeRACColumn(frame, signature, name));
}
}
return colCache.get(name);
diff --git
a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
index 00fafe53f62..175e9de2a56 100644
---
a/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/field/DoubleFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
+import java.util.List;
+import java.util.Objects;
+
public class DoubleFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@@ -143,6 +152,27 @@ public class DoubleFieldReaderTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void testCompareRows()
+ {
+ final List<Double> rows =
FrameWriterTestData.TEST_DOUBLES.getData(KeyOrder.ASCENDING);
+
+ final ColumnAccessor accessor =
+ RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+ MapOfColumnsRowsAndColumns.builder()
+ .add("dim1", rows.toArray(),
ColumnType.DOUBLE)
+ .build()
+ ).findColumn("dim1").toAccessor();
+
+ for (int i = 1; i < rows.size(); i++) {
+ if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+ Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+ } else {
+ Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+ }
+ }
+ }
+
@Test
public void test_makeDimensionSelector_aValue()
{
diff --git
a/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java
b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java
new file mode 100644
index 00000000000..70ff7037b8c
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/frame/field/FieldReaderRACTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.field;
+
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.SimpleAscendingOffset;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class FieldReaderRACTest extends InitializedNullHandlingTest
+{
+ final DruidExceptionMatcher noArraysMatcher = DruidExceptionMatcher
+ .defensive()
+ .expectMessageIs("Can only work with single-valued strings, should use a
COMPLEX or ARRAY typed Column instead");
+
+ @Test
+ public void testDataSet() throws IOException
+ {
+ final QueryableIndex index = TestIndex.getMMappedTestIndex();
+ final QueryableIndexStorageAdapter storageAdapter = new
QueryableIndexStorageAdapter(index);
+ final Frame frame = FrameTestUtil.adapterToFrame(storageAdapter,
FrameType.ROW_BASED);
+
+ final RowSignature siggy = storageAdapter.getRowSignature();
+ final RowBasedFrameRowsAndColumns rowBasedRAC = new
RowBasedFrameRowsAndColumns(frame, siggy);
+
+ for (String columnName : siggy.getColumnNames()) {
+ final ColumnHolder colHolder = index.getColumnHolder(columnName);
+ final boolean multiValue =
colHolder.getCapabilities().hasMultipleValues().isTrue();
+
+ try (BaseColumn col = colHolder.getColumn()) {
+ final ColumnAccessor racCol =
rowBasedRAC.findColumn(columnName).toAccessor();
+
+ final SimpleAscendingOffset offset = new
SimpleAscendingOffset(racCol.numRows());
+ final ColumnValueSelector<?> selector =
col.makeColumnValueSelector(offset);
+ while (offset.withinBounds()) {
+ if (multiValue) {
+ noArraysMatcher.assertThrowsAndMatches(() ->
racCol.getObject(offset.getOffset()));
+ } else {
+ final Object racObj = racCol.getObject(offset.getOffset());
+ Assert.assertEquals(racCol.isNull(offset.getOffset()),
racCol.getObject(offset.getOffset()) == null);
+ Assert.assertEquals(selector.getObject(), racObj);
+ }
+ offset.increment();
+ }
+ }
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
index 4b8d10ca0e0..3669c89232d 100644
---
a/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/field/FloatFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
+import java.util.List;
+import java.util.Objects;
+
public class FloatFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@@ -75,6 +84,27 @@ public class FloatFieldReaderTest extends
InitializedNullHandlingTest
Assert.assertEquals(NullHandling.sqlCompatible(),
FloatFieldReader.forPrimitive().isNull(memory, MEMORY_POSITION));
}
+ @Test
+ public void testCompareRows()
+ {
+ final List<Float> rows =
FrameWriterTestData.TEST_FLOATS.getData(KeyOrder.ASCENDING);
+
+ final ColumnAccessor accessor =
+ RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+ MapOfColumnsRowsAndColumns.builder()
+ .add("dim1", rows.toArray(),
ColumnType.FLOAT)
+ .build()
+ ).findColumn("dim1").toAccessor();
+
+ for (int i = 1; i < rows.size(); i++) {
+ if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+ Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+ } else {
+ Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+ }
+ }
+ }
+
@Test
public void test_isNull_aValue()
{
diff --git
a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
index aab55654e5a..266d86e1283 100644
---
a/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/field/LongFieldReaderTest.java
@@ -21,14 +21,20 @@ package org.apache.druid.frame.field;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.frame.write.FrameWriterTestData;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.DruidObjectPredicate;
import org.apache.druid.query.filter.StringPredicateDruidPredicateFactory;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
+import
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.After;
@@ -42,6 +48,9 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
+import java.util.List;
+import java.util.Objects;
+
public class LongFieldReaderTest extends InitializedNullHandlingTest
{
private static final long MEMORY_POSITION = 1;
@@ -202,6 +211,28 @@ public class LongFieldReaderTest extends
InitializedNullHandlingTest
Assert.assertFalse(readSelector.makeValueMatcher(StringPredicateDruidPredicateFactory.equalTo("2")).matches(false));
}
+
+ @Test
+ public void testCompareRows()
+ {
+ final List<Long> rows =
FrameWriterTestData.TEST_LONGS.getData(KeyOrder.ASCENDING);
+
+ final ColumnAccessor accessor =
+ RowBasedFrameRowsAndColumnsTest.MAKER.apply(
+ MapOfColumnsRowsAndColumns.builder()
+ .add("dim1", rows.toArray(),
ColumnType.LONG)
+ .build()
+ ).findColumn("dim1").toAccessor();
+
+ for (int i = 1; i < rows.size(); i++) {
+ if (Objects.equals(accessor.getObject(i - 1), accessor.getObject(i))) {
+ Assert.assertEquals(0, accessor.compareRows(i - 1, i));
+ } else {
+ Assert.assertTrue(accessor.compareRows(i - 1, i) < 0);
+ }
+ }
+ }
+
private void writeToMemory(final Long value)
{
Mockito.when(writeSelector.isNull()).thenReturn(value == null);
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
index 56be3d50f20..7b639b3d48d 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/RowsAndColumnsTestBase.java
@@ -24,6 +24,8 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import
org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
+import
org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumnsTest;
import org.junit.Assert;
import org.junit.Test;
@@ -67,7 +69,8 @@ public abstract class RowsAndColumnsTestBase
new Object[]{ConcatRowsAndColumns.class,
ConcatRowsAndColumnsTest.MAKER},
new Object[]{RearrangedRowsAndColumns.class,
RearrangedRowsAndColumnsTest.MAKER},
new Object[]{ColumnBasedFrameRowsAndColumns.class,
ColumnBasedFrameRowsAndColumnsTest.MAKER},
- new Object[]{StorageAdapterRowsAndColumns.class,
StorageAdapterRowsAndColumnsTest.MAKER}
+ new Object[]{StorageAdapterRowsAndColumns.class,
StorageAdapterRowsAndColumnsTest.MAKER},
+ new Object[]{RowBasedFrameRowsAndColumns.class,
RowBasedFrameRowsAndColumnsTest.MAKER}
);
}
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
index cd1bb1b81ec..acfcbe6f83e 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java
@@ -33,17 +33,13 @@ public class ColumnBasedFrameRowsAndColumnsTest extends
RowsAndColumnsTestBase
super(ColumnBasedFrameRowsAndColumns.class);
}
- public static Function<MapOfColumnsRowsAndColumns,
ColumnBasedFrameRowsAndColumns> MAKER = input -> {
-
- return buildFrame(input);
- };
+ public static Function<MapOfColumnsRowsAndColumns,
ColumnBasedFrameRowsAndColumns> MAKER =
ColumnBasedFrameRowsAndColumnsTest::buildFrame;
public static ColumnBasedFrameRowsAndColumns
buildFrame(MapOfColumnsRowsAndColumns input)
{
LazilyDecoratedRowsAndColumns rac = new
LazilyDecoratedRowsAndColumns(input, null, null, null,
OffsetLimit.limit(Integer.MAX_VALUE), null, null);
rac.numRows(); // materialize
-
return (ColumnBasedFrameRowsAndColumns) rac.getBase();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
new file mode 100644
index 00000000000..867e83d9e00
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumnsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumnsTestBase;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+public class RowBasedFrameRowsAndColumnsTest extends RowsAndColumnsTestBase
+{
+ public RowBasedFrameRowsAndColumnsTest()
+ {
+ super(RowBasedFrameRowsAndColumns.class);
+ }
+
+ public static Function<MapOfColumnsRowsAndColumns,
RowBasedFrameRowsAndColumns> MAKER =
RowBasedFrameRowsAndColumnsTest::buildFrame;
+
+ private static RowBasedFrameRowsAndColumns
buildFrame(MapOfColumnsRowsAndColumns rac)
+ {
+ final AtomicInteger rowId = new AtomicInteger(0);
+ final int numRows = rac.numRows();
+ final ColumnSelectorFactoryMaker csfm =
ColumnSelectorFactoryMaker.fromRAC(rac);
+ final ColumnSelectorFactory selectorFactory = csfm.make(rowId);
+
+ final RowSignature.Builder sigBob = RowSignature.builder();
+ final ArenaMemoryAllocatorFactory memFactory = new
ArenaMemoryAllocatorFactory(200 << 20);
+
+
+ for (String column : rac.getColumnNames()) {
+ final Column racColumn = rac.findColumn(column);
+ if (racColumn == null) {
+ continue;
+ }
+ sigBob.add(column, racColumn.toAccessor().getType());
+ }
+
+ final RowSignature signature = sigBob.build();
+ final FrameWriter frameWriter =
FrameWriters.makeRowBasedFrameWriterFactory(
+ memFactory,
+ signature,
+ Collections.emptyList(),
+ false
+ ).newFrameWriter(selectorFactory);
+
+ rowId.set(0);
+ for (; rowId.get() < numRows; rowId.incrementAndGet()) {
+ frameWriter.addSelection();
+ }
+
+ return new
RowBasedFrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), signature);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]