This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 93920b1 Unify data type conversion and formatting (#6728)
93920b1 is described below
commit 93920b17f9dc06c4d9b9baeb6006ab780afed8dd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 1 21:32:31 2021 -0700
Unify data type conversion and formatting (#6728)
Unify the logic of data type conversion and formatting into
`PinotDataType`, `DataType`, `ColumnDataType`.
- `PinotDataType` is used for type conversion
- `DataType` is used in schema to represent the source data type (only have
single-valued type)
- `ColumnDataType` is used in `DataTable` and `ResultTable` to represent
the result type
`BYTES` type is stored as `byte[]` externally but as `ByteArray` internally
for hashing and sorting purpose. This PR handles the conversion of `BYTES` type
(also new types to be added in the future) so that the the result can be passed
to the UDF for post-aggregation calculations.
Also adding primitive array types to `PinotDataType`.
We should consider merging `DataType` and `ColumnDataType` in the future.
---
.../pinot/common/function/FunctionInvoker.java | 19 +-
.../pinot/common/function/FunctionUtils.java | 24 +-
.../common/function/scalar/StringFunctions.java | 2 +-
.../org/apache/pinot/common/utils/DataSchema.java | 181 ++++++++++++-
.../apache/pinot/common/utils/PinotDataType.java | 279 +++++++++++++++------
.../pinot/common/utils/request/RequestUtils.java | 48 +---
.../pinot/common/utils/PinotDataTypeTest.java | 9 +-
.../recordtransformer/DataTypeTransformer.java | 2 +-
.../JsonExtractScalarTransformFunction.java | 68 ++---
.../function/ScalarTransformFunctionWrapper.java | 37 +--
.../postaggregation/PostAggregationFunction.java | 14 +-
.../query/pruner/ColumnValueSegmentPruner.java | 21 +-
.../query/reduce/AggregationDataTableReducer.java | 27 +-
.../query/reduce/DistinctDataTableReducer.java | 14 +-
.../core/query/reduce/GroupByDataTableReducer.java | 30 ++-
.../core/query/reduce/HavingFilterHandler.java | 26 +-
.../query/selection/SelectionOperatorService.java | 40 +--
.../query/selection/SelectionOperatorUtils.java | 91 +------
.../core/query/reduce/HavingFilterHandlerTest.java | 22 +-
.../apache/pinot/queries/StUnionQueriesTest.java | 59 +++--
.../pinot/queries/SumPrecisionQueriesTest.java | 72 ++++--
.../java/org/apache/pinot/spi/data/FieldSpec.java | 116 ++++++---
22 files changed, 738 insertions(+), 463 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
index aec453c..b185d26 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
@@ -23,7 +23,6 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.common.utils.PinotDataType;
@@ -109,23 +108,7 @@ public class FunctionInvoker {
PinotDataType argumentType =
FunctionUtils.getArgumentType(argumentClass);
Preconditions.checkArgument(parameterType != null && argumentType !=
null,
"Cannot convert value from class: %s to class: %s", argumentClass,
parameterClass);
- Object convertedArgument = parameterType.convert(argument, argumentType);
- // For primitive array parameter, convert the argument from Object array
to primitive array
- switch (parameterType) {
- case INTEGER_ARRAY:
- convertedArgument = ArrayUtils.toPrimitive((Integer[])
convertedArgument);
- break;
- case LONG_ARRAY:
- convertedArgument = ArrayUtils.toPrimitive((Long[])
convertedArgument);
- break;
- case FLOAT_ARRAY:
- convertedArgument = ArrayUtils.toPrimitive((Float[])
convertedArgument);
- break;
- case DOUBLE_ARRAY:
- convertedArgument = ArrayUtils.toPrimitive((Double[])
convertedArgument);
- break;
- }
- arguments[i] = convertedArgument;
+ arguments[i] = parameterType.convert(argument, argumentType);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
index 1b04ff7..000e960 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java
@@ -42,10 +42,10 @@ public class FunctionUtils {
put(Double.class, PinotDataType.DOUBLE);
put(String.class, PinotDataType.STRING);
put(byte[].class, PinotDataType.BYTES);
- put(int[].class, PinotDataType.INTEGER_ARRAY);
- put(long[].class, PinotDataType.LONG_ARRAY);
- put(float[].class, PinotDataType.FLOAT_ARRAY);
- put(double[].class, PinotDataType.DOUBLE_ARRAY);
+ put(int[].class, PinotDataType.PRIMITIVE_INT_ARRAY);
+ put(long[].class, PinotDataType.PRIMITIVE_LONG_ARRAY);
+ put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
+ put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
put(String[].class, PinotDataType.STRING_ARRAY);
}};
@@ -61,13 +61,13 @@ public class FunctionUtils {
put(Double.class, PinotDataType.DOUBLE);
put(String.class, PinotDataType.STRING);
put(byte[].class, PinotDataType.BYTES);
- put(int[].class, PinotDataType.INTEGER_ARRAY);
+ put(int[].class, PinotDataType.PRIMITIVE_INT_ARRAY);
put(Integer[].class, PinotDataType.INTEGER_ARRAY);
- put(long[].class, PinotDataType.LONG_ARRAY);
+ put(long[].class, PinotDataType.PRIMITIVE_LONG_ARRAY);
put(Long[].class, PinotDataType.LONG_ARRAY);
- put(float[].class, PinotDataType.FLOAT_ARRAY);
+ put(float[].class, PinotDataType.PRIMITIVE_FLOAT_ARRAY);
put(Float[].class, PinotDataType.FLOAT_ARRAY);
- put(double[].class, PinotDataType.DOUBLE_ARRAY);
+ put(double[].class, PinotDataType.PRIMITIVE_DOUBLE_ARRAY);
put(Double[].class, PinotDataType.DOUBLE_ARRAY);
put(String[].class, PinotDataType.STRING_ARRAY);
}};
@@ -84,13 +84,9 @@ public class FunctionUtils {
put(String.class, DataType.STRING);
put(byte[].class, DataType.BYTES);
put(int[].class, DataType.INT);
- put(Integer[].class, DataType.INT);
put(long[].class, DataType.LONG);
- put(Long[].class, DataType.LONG);
put(float[].class, DataType.FLOAT);
- put(Float[].class, DataType.FLOAT);
put(double[].class, DataType.DOUBLE);
- put(Double[].class, DataType.DOUBLE);
put(String[].class, DataType.STRING);
}};
@@ -106,13 +102,9 @@ public class FunctionUtils {
put(String.class, ColumnDataType.STRING);
put(byte[].class, ColumnDataType.BYTES);
put(int[].class, ColumnDataType.INT_ARRAY);
- put(Integer[].class, ColumnDataType.INT_ARRAY);
put(long[].class, ColumnDataType.LONG_ARRAY);
- put(Long[].class, ColumnDataType.LONG_ARRAY);
put(float[].class, ColumnDataType.FLOAT_ARRAY);
- put(Float[].class, ColumnDataType.FLOAT_ARRAY);
put(double[].class, ColumnDataType.DOUBLE_ARRAY);
- put(Double[].class, ColumnDataType.DOUBLE_ARRAY);
put(String[].class, ColumnDataType.STRING_ARRAY);
}};
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
index eded7cb..6cbc460 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/StringFunctions.java
@@ -348,7 +348,7 @@ public class StringFunctions {
* @return returns true if substring present in main string else false.
*/
@ScalarFunction
- public static Boolean contains(String input, String substring) {
+ public static boolean contains(String input, String substring) {
return input.contains(substring);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 8ba9ca4..68d1301 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -23,8 +23,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.EqualityUtils;
@@ -175,7 +178,7 @@ public class DataSchema {
return new DataSchema(columnNames, columnDataTypes);
}
- @SuppressWarnings("CloneDoesntCallSuperClone")
+ @SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public DataSchema clone() {
return new DataSchema(_columnNames.clone(), _columnDataTypes.clone());
@@ -241,11 +244,179 @@ public class DataSchema {
this.isNumberArray() && anotherColumnDataType.isNumberArray());
}
- public static ColumnDataType fromDataType(FieldSpec.DataType dataType,
boolean isSingleValue) {
+ public DataType toDataType() {
+ switch (this) {
+ case INT:
+ return DataType.INT;
+ case LONG:
+ return DataType.LONG;
+ case FLOAT:
+ return DataType.FLOAT;
+ case DOUBLE:
+ return DataType.DOUBLE;
+ case STRING:
+ return DataType.STRING;
+ case BYTES:
+ return DataType.BYTES;
+ default:
+ throw new IllegalStateException(String.format("Cannot convert
ColumnDataType: %s to DataType", this));
+ }
+ }
+
+ /**
+ * Converts the given internal value to the type for external use (e.g. as
UDF argument). The given value should be
+ * compatible with the type.
+ */
+ public Serializable convert(Object value) {
+ switch (this) {
+ case INT:
+ return ((Number) value).intValue();
+ case LONG:
+ return ((Number) value).longValue();
+ case FLOAT:
+ return ((Number) value).floatValue();
+ case DOUBLE:
+ return ((Number) value).doubleValue();
+ case STRING:
+ return value.toString();
+ case BYTES:
+ return ((ByteArray) value).getBytes();
+ case INT_ARRAY:
+ return (int[]) value;
+ case LONG_ARRAY:
+ if (value instanceof long[]) {
+ return (long[]) value;
+ } else {
+ int[] intValues = (int[]) value;
+ int length = intValues.length;
+ long[] longValues = new long[length];
+ for (int i = 0; i < length; i++) {
+ longValues[i] = intValues[i];
+ }
+ return longValues;
+ }
+ case FLOAT_ARRAY:
+ return (float[]) value;
+ case DOUBLE_ARRAY:
+ if (value instanceof double[]) {
+ return (double[]) value;
+ } else if (value instanceof int[]) {
+ int[] intValues = (int[]) value;
+ int length = intValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = intValues[i];
+ }
+ return doubleValues;
+ } else if (value instanceof long[]) {
+ long[] longValues = (long[]) value;
+ int length = longValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = longValues[i];
+ }
+ return doubleValues;
+ } else {
+ float[] floatValues = (float[]) value;
+ int length = floatValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = floatValues[i];
+ }
+ return doubleValues;
+ }
+ case STRING_ARRAY:
+ return (String[]) value;
+ default:
+ throw new IllegalStateException(String.format("Cannot convert: '%s'
to type: %s", value, this));
+ }
+ }
+
+ /**
+ * Formats the value to human-readable format based on the type to be used
in the query response.
+ */
+ public Serializable format(Object value) {
+ switch (this) {
+ case BYTES:
+ return BytesUtils.toHexString((byte[]) value);
+ default:
+ return (Serializable) value;
+ }
+ }
+
+ /**
+ * Equivalent to {@link #convert(Object)} and {@link #format(Object)} with
a single switch statement.
+ */
+ public Serializable convertAndFormat(Object value) {
+ switch (this) {
+ case INT:
+ return ((Number) value).intValue();
+ case LONG:
+ return ((Number) value).longValue();
+ case FLOAT:
+ return ((Number) value).floatValue();
+ case DOUBLE:
+ return ((Number) value).doubleValue();
+ case STRING:
+ return value.toString();
+ case BYTES:
+ return ((ByteArray) value).toHexString();
+ case INT_ARRAY:
+ return (int[]) value;
+ case LONG_ARRAY:
+ if (value instanceof long[]) {
+ return (long[]) value;
+ } else {
+ int[] intValues = (int[]) value;
+ int length = intValues.length;
+ long[] longValues = new long[length];
+ for (int i = 0; i < length; i++) {
+ longValues[i] = intValues[i];
+ }
+ return longValues;
+ }
+ case FLOAT_ARRAY:
+ return (float[]) value;
+ case DOUBLE_ARRAY:
+ if (value instanceof double[]) {
+ return (double[]) value;
+ } else if (value instanceof int[]) {
+ int[] intValues = (int[]) value;
+ int length = intValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = intValues[i];
+ }
+ return doubleValues;
+ } else if (value instanceof long[]) {
+ long[] longValues = (long[]) value;
+ int length = longValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = longValues[i];
+ }
+ return doubleValues;
+ } else {
+ float[] floatValues = (float[]) value;
+ int length = floatValues.length;
+ double[] doubleValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleValues[i] = floatValues[i];
+ }
+ return doubleValues;
+ }
+ case STRING_ARRAY:
+ return (String[]) value;
+ default:
+ throw new IllegalStateException(String.format("Cannot convert and
format: '%s' to type: %s", value, this));
+ }
+ }
+
+ public static ColumnDataType fromDataType(DataType dataType, boolean
isSingleValue) {
return isSingleValue ? fromDataTypeSV(dataType) :
fromDataTypeMV(dataType);
}
- public static ColumnDataType fromDataTypeSV(FieldSpec.DataType dataType) {
+ public static ColumnDataType fromDataTypeSV(DataType dataType) {
switch (dataType) {
case INT:
return INT;
@@ -264,7 +435,7 @@ public class DataSchema {
}
}
- public static ColumnDataType fromDataTypeMV(FieldSpec.DataType dataType) {
+ public static ColumnDataType fromDataTypeMV(DataType dataType) {
switch (dataType) {
case INT:
return INT_ARRAY;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 7325318..95cdf81 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BytesUtils;
@@ -28,7 +29,7 @@ import org.apache.pinot.spi.utils.BytesUtils;
* The <code>PinotDataType</code> enum represents the data type of a value in
a row from recordReader and provides
* utility methods to convert value across types if applicable.
* <p>We don't use <code>PinotDataType</code> to maintain type information,
but use it to help organize the data and
- * use {@link FieldSpec.DataType} to maintain type information separately
across various readers.
+ * use {@link DataType} to maintain type information separately across
various readers.
* <p>NOTE:
* <ul>
* <li>We will silently lose information if a conversion causes us to do so
(e.g. DOUBLE to INT)</li>
@@ -40,22 +41,22 @@ public enum PinotDataType {
BOOLEAN {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Boolean) value) ? 1 : 0;
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Boolean) value) ? 1L : 0L;
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Boolean) value) ? 1f : 0f;
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Boolean) value) ? 1d : 0d;
}
@@ -72,22 +73,22 @@ public enum PinotDataType {
BYTE {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Byte) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Byte) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Byte) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Byte) value).doubleValue();
}
@@ -104,22 +105,22 @@ public enum PinotDataType {
CHARACTER {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return (int) ((Character) value);
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return (long) ((Character) value);
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return (float) ((Character) value);
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return (double) ((Character) value);
}
@@ -136,22 +137,22 @@ public enum PinotDataType {
SHORT {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Short) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Short) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Short) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Short) value).doubleValue();
}
@@ -168,22 +169,22 @@ public enum PinotDataType {
INTEGER {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return (Integer) value;
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Integer) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Integer) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Integer) value).doubleValue();
}
@@ -199,28 +200,28 @@ public enum PinotDataType {
@Override
public Integer convert(Object value, PinotDataType sourceType) {
- return sourceType.toInteger(value);
+ return sourceType.toInt(value);
}
},
LONG {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Long) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return (Long) value;
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Long) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Long) value).doubleValue();
}
@@ -242,22 +243,22 @@ public enum PinotDataType {
FLOAT {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Float) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Float) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return (Float) value;
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Float) value).doubleValue();
}
@@ -279,22 +280,22 @@ public enum PinotDataType {
DOUBLE {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Double) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Double) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Double) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return (Double) value;
}
@@ -316,35 +317,35 @@ public enum PinotDataType {
STRING {
@Override
- public Integer toInteger(Object value) {
- return Integer.valueOf(((String) value).trim());
+ public int toInt(Object value) {
+ return Integer.parseInt(value.toString().trim());
}
@Override
- public Long toLong(Object value) {
- return Long.valueOf(((String) value).trim());
+ public long toLong(Object value) {
+ return Long.parseLong(value.toString().trim());
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
// NOTE: No need to trim here because Float.valueOf() will trim the
string
- return Float.valueOf((String) value);
+ return Float.parseFloat(value.toString());
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
// NOTE: No need to trim here because Double.valueOf() will trim the
string
- return Double.valueOf((String) value);
+ return Double.parseDouble(value.toString());
}
@Override
public String toString(Object value) {
- return (String) value;
+ return value.toString();
}
@Override
public byte[] toBytes(Object value) {
- return BytesUtils.toBytes(((String) value).trim());
+ return BytesUtils.toBytes(value.toString().trim());
}
@Override
@@ -355,22 +356,22 @@ public enum PinotDataType {
BYTES {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
throw new UnsupportedOperationException("Cannot convert value from BYTES
to INTEGER");
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
throw new UnsupportedOperationException("Cannot convert value from BYTES
to LONG");
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
throw new UnsupportedOperationException("Cannot convert value from BYTES
to FLOAT");
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
throw new UnsupportedOperationException("Cannot convert value from BYTES
to DOUBLE");
}
@@ -392,22 +393,22 @@ public enum PinotDataType {
OBJECT {
@Override
- public Integer toInteger(Object value) {
+ public int toInt(Object value) {
return ((Number) value).intValue();
}
@Override
- public Long toLong(Object value) {
+ public long toLong(Object value) {
return ((Number) value).longValue();
}
@Override
- public Float toFloat(Object value) {
+ public float toFloat(Object value) {
return ((Number) value).floatValue();
}
@Override
- public Double toDouble(Object value) {
+ public double toDouble(Object value) {
return ((Number) value).doubleValue();
}
@@ -439,6 +440,20 @@ public enum PinotDataType {
SHORT_ARRAY,
+ /*
+ NOTE:
+ Primitive array is used in query execution, query response, scalar
function arguments and return values.
+ Object array is used in GenericRow for data ingestion.
+ We need to keep them separately because they cannot automatically cast
to the other type.
+ */
+
+ PRIMITIVE_INT_ARRAY {
+ @Override
+ public int[] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toPrimitiveIntArray(value);
+ }
+ },
+
INTEGER_ARRAY {
@Override
public Integer[] convert(Object value, PinotDataType sourceType) {
@@ -446,6 +461,13 @@ public enum PinotDataType {
}
},
+ PRIMITIVE_LONG_ARRAY {
+ @Override
+ public long[] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toPrimitiveLongArray(value);
+ }
+ },
+
LONG_ARRAY {
@Override
public Long[] convert(Object value, PinotDataType sourceType) {
@@ -453,6 +475,13 @@ public enum PinotDataType {
}
},
+ PRIMITIVE_FLOAT_ARRAY {
+ @Override
+ public float[] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toPrimitiveFloatArray(value);
+ }
+ },
+
FLOAT_ARRAY {
@Override
public Float[] convert(Object value, PinotDataType sourceType) {
@@ -460,6 +489,13 @@ public enum PinotDataType {
}
},
+ PRIMITIVE_DOUBLE_ARRAY {
+ @Override
+ public double[] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toPrimitiveDoubleArray(value);
+ }
+ },
+
DOUBLE_ARRAY {
@Override
public Double[] convert(Object value, PinotDataType sourceType) {
@@ -477,49 +513,91 @@ public enum PinotDataType {
OBJECT_ARRAY;
/**
- * NOTE: override toInteger(), toLong(), toFloat(), toDouble(), toString()
and toBytes() for single-value types.
+ * NOTE: override toInt(), toLong(), toFloat(), toDouble(), toString() and
toBytes() for single-value types.
*/
- public Integer toInteger(Object value) {
- return getSingleValueType().toInteger(((Object[]) value)[0]);
+ public int toInt(Object value) {
+ return getSingleValueType().toInt(toObjectArray(value)[0]);
}
- public Long toLong(Object value) {
- return getSingleValueType().toLong(((Object[]) value)[0]);
+ public long toLong(Object value) {
+ return getSingleValueType().toLong(toObjectArray(value)[0]);
}
- public Float toFloat(Object value) {
- return getSingleValueType().toFloat(((Object[]) value)[0]);
+ public float toFloat(Object value) {
+ return getSingleValueType().toFloat(toObjectArray(value)[0]);
}
- public Double toDouble(Object value) {
- return getSingleValueType().toDouble(((Object[]) value)[0]);
+ public double toDouble(Object value) {
+ return getSingleValueType().toDouble(toObjectArray(value)[0]);
}
public String toString(Object value) {
- return getSingleValueType().toString(((Object[]) value)[0]);
+ return getSingleValueType().toString(toObjectArray(value)[0]);
}
public byte[] toBytes(Object value) {
- return getSingleValueType().toBytes(((Object[]) value)[0]);
+ return getSingleValueType().toBytes(toObjectArray(value)[0]);
+ }
+
+ public int[] toPrimitiveIntArray(Object value) {
+ if (value instanceof int[]) {
+ return (int[]) value;
+ }
+ if (isSingleValue()) {
+ return new int[]{toInt(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ int[] intArray = new int[length];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ intArray[i] = singleValueType.toInt(valueArray[i]);
+ }
+ return intArray;
+ }
}
public Integer[] toIntegerArray(Object value) {
+ if (value instanceof Integer[]) {
+ return (Integer[]) value;
+ }
if (isSingleValue()) {
- return new Integer[]{toInteger(value)};
+ return new Integer[]{toInt(value)};
} else {
Object[] valueArray = toObjectArray(value);
int length = valueArray.length;
Integer[] integerArray = new Integer[length];
PinotDataType singleValueType = getSingleValueType();
for (int i = 0; i < length; i++) {
- integerArray[i] = singleValueType.toInteger(valueArray[i]);
+ integerArray[i] = singleValueType.toInt(valueArray[i]);
}
return integerArray;
}
}
+ public long[] toPrimitiveLongArray(Object value) {
+ if (value instanceof long[]) {
+ return (long[]) value;
+ }
+ if (isSingleValue()) {
+ return new long[]{toLong(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ long[] longArray = new long[length];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ longArray[i] = singleValueType.toLong(valueArray[i]);
+ }
+ return longArray;
+ }
+ }
+
public Long[] toLongArray(Object value) {
+ if (value instanceof Long[]) {
+ return (Long[]) value;
+ }
if (isSingleValue()) {
return new Long[]{toLong(value)};
} else {
@@ -534,7 +612,28 @@ public enum PinotDataType {
}
}
+ public float[] toPrimitiveFloatArray(Object value) {
+ if (value instanceof float[]) {
+ return (float[]) value;
+ }
+ if (isSingleValue()) {
+ return new float[]{toFloat(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ float[] floatArray = new float[length];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ floatArray[i] = singleValueType.toFloat(valueArray[i]);
+ }
+ return floatArray;
+ }
+ }
+
public Float[] toFloatArray(Object value) {
+ if (value instanceof Float[]) {
+ return (Float[]) value;
+ }
if (isSingleValue()) {
return new Float[]{toFloat(value)};
} else {
@@ -549,7 +648,28 @@ public enum PinotDataType {
}
}
+ public double[] toPrimitiveDoubleArray(Object value) {
+ if (value instanceof double[]) {
+ return (double[]) value;
+ }
+ if (isSingleValue()) {
+ return new double[]{toDouble(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ double[] doubleArray = new double[length];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ doubleArray[i] = singleValueType.toDouble(valueArray[i]);
+ }
+ return doubleArray;
+ }
+ }
+
public Double[] toDoubleArray(Object value) {
+ if (value instanceof Double[]) {
+ return (Double[]) value;
+ }
if (isSingleValue()) {
return new Double[]{toDouble(value)};
} else {
@@ -565,6 +685,9 @@ public enum PinotDataType {
}
public String[] toStringArray(Object value) {
+ if (value instanceof String[]) {
+ return (String[]) value;
+ }
if (isSingleValue()) {
return new String[]{toString(value)};
} else {
@@ -616,12 +739,16 @@ public enum PinotDataType {
return CHARACTER;
case SHORT_ARRAY:
return SHORT;
+ case PRIMITIVE_INT_ARRAY:
case INTEGER_ARRAY:
return INTEGER;
+ case PRIMITIVE_LONG_ARRAY:
case LONG_ARRAY:
return LONG;
+ case PRIMITIVE_FLOAT_ARRAY:
case FLOAT_ARRAY:
return FLOAT;
+ case PRIMITIVE_DOUBLE_ARRAY:
case DOUBLE_ARRAY:
return DOUBLE;
case STRING_ARRAY:
@@ -633,8 +760,12 @@ public enum PinotDataType {
}
}
- public static PinotDataType getPinotDataType(FieldSpec fieldSpec) {
- FieldSpec.DataType dataType = fieldSpec.getDataType();
+ /**
+ * Returns the {@link PinotDataType} for the given {@link FieldSpec} for
data ingestion purpose. Returns object array
+ * type for multi-valued types.
+ */
+ public static PinotDataType getPinotDataTypeForIngestion(FieldSpec
fieldSpec) {
+ DataType dataType = fieldSpec.getDataType();
switch (dataType) {
case INT:
return fieldSpec.isSingleValueField() ? PinotDataType.INTEGER :
PinotDataType.INTEGER_ARRAY;
@@ -658,7 +789,11 @@ public enum PinotDataType {
}
}
- public static PinotDataType getPinotDataType(ColumnDataType columnDataType) {
+ /**
+ * Returns the {@link PinotDataType} for the given {@link ColumnDataType}
for query execution purpose. Returns
+ * primitive array type for multi-valued types.
+ */
+ public static PinotDataType getPinotDataTypeForExecution(ColumnDataType
columnDataType) {
switch (columnDataType) {
case INT:
return INTEGER;
@@ -673,13 +808,13 @@ public enum PinotDataType {
case BYTES:
return BYTES;
case INT_ARRAY:
- return INTEGER_ARRAY;
+ return PRIMITIVE_INT_ARRAY;
case LONG_ARRAY:
- return LONG_ARRAY;
+ return PRIMITIVE_LONG_ARRAY;
case FLOAT_ARRAY:
- return FLOAT_ARRAY;
+ return PRIMITIVE_FLOAT_ARRAY;
case DOUBLE_ARRAY:
- return DOUBLE_ARRAY;
+ return PRIMITIVE_DOUBLE_ARRAY;
case STRING_ARRAY:
return STRING_ARRAY;
default:
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 6297c01..da4f53a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -43,7 +43,6 @@ import org.apache.pinot.pql.parsers.pql2.ast.LiteralAstNode;
import org.apache.pinot.pql.parsers.pql2.ast.PredicateAstNode;
import org.apache.pinot.pql.parsers.pql2.ast.StringLiteralAstNode;
import org.apache.pinot.spi.utils.BytesUtils;
-import org.apache.pinot.sql.parsers.SqlCompilationException;
public class RequestUtils {
@@ -127,62 +126,41 @@ public class RequestUtils {
return expression;
}
- public static Expression getLiteralExpression(String value) {
+ public static Expression getLiteralExpression(long value) {
Expression expression = createNewLiteralExpression();
- expression.getLiteral().setStringValue(value);
+ expression.getLiteral().setLongValue(value);
return expression;
}
- public static Expression getLiteralExpression(byte[] value) {
+ public static Expression getLiteralExpression(double value) {
Expression expression = createNewLiteralExpression();
- expression.getLiteral().setStringValue(BytesUtils.toHexString(value));
+ expression.getLiteral().setDoubleValue(value);
return expression;
}
- public static Expression getLiteralExpression(Integer value) {
- return getLiteralExpression(value.longValue());
- }
-
- public static Expression getLiteralExpression(Long value) {
+ public static Expression getLiteralExpression(String value) {
Expression expression = createNewLiteralExpression();
- expression.getLiteral().setLongValue(value);
+ expression.getLiteral().setStringValue(value);
return expression;
}
- public static Expression getLiteralExpression(Float value) {
- return getLiteralExpression(value.doubleValue());
- }
-
- public static Expression getLiteralExpression(Double value) {
+ public static Expression getLiteralExpression(byte[] value) {
Expression expression = createNewLiteralExpression();
- expression.getLiteral().setDoubleValue(value);
+ expression.getLiteral().setStringValue(BytesUtils.toHexString(value));
return expression;
}
public static Expression getLiteralExpression(Object object) {
- if (object instanceof Integer) {
- return RequestUtils.getLiteralExpression((Integer) object);
- }
- if (object instanceof Long) {
- return RequestUtils.getLiteralExpression((Long) object);
- }
- if (object instanceof Float) {
- return RequestUtils.getLiteralExpression((Float) object);
- }
- if (object instanceof Double) {
- return RequestUtils.getLiteralExpression((Double) object);
- }
- if (object instanceof String) {
- return RequestUtils.getLiteralExpression((String) object);
+ if (object instanceof Integer || object instanceof Long) {
+ return RequestUtils.getLiteralExpression(((Number) object).longValue());
}
- if (object instanceof SqlLiteral) {
- return RequestUtils.getLiteralExpression((SqlLiteral) object);
+ if (object instanceof Float || object instanceof Double) {
+ return RequestUtils.getLiteralExpression(((Number)
object).doubleValue());
}
if (object instanceof byte[]) {
return RequestUtils.getLiteralExpression((byte[]) object);
}
- throw new SqlCompilationException(
- new IllegalArgumentException("Unsupported Literal value type - " +
object.getClass()));
+ return RequestUtils.getLiteralExpression(object.toString());
}
public static Expression getFunctionExpression(String operator) {
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 1c6d241..6960366 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -94,11 +94,10 @@ public class PinotDataTypeTest {
@Test
public void testObject() {
- assertEquals(OBJECT.toInteger(new NumberObject("123")).intValue(), 123);
- assertEquals(OBJECT.toLong(new NumberObject("123")).intValue(), 123);
- assertEquals(OBJECT.toFloat(new NumberObject("123")).intValue(), 123);
- assertEquals(OBJECT.toDouble(new NumberObject("123")).intValue(), 123);
- assertEquals(OBJECT.toInteger(new NumberObject("123")).intValue(), 123);
+ assertEquals(OBJECT.toInt(new NumberObject("123")), 123);
+ assertEquals(OBJECT.toLong(new NumberObject("123")), 123L);
+ assertEquals(OBJECT.toFloat(new NumberObject("123")), 123f);
+ assertEquals(OBJECT.toDouble(new NumberObject("123")), 123.0);
assertEquals(OBJECT.toString(new NumberObject("123")), "123");
assertEquals(OBJECT_ARRAY.getSingleValueType(), OBJECT);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
index ee2f5bc..be4b1b6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
@@ -71,7 +71,7 @@ public class DataTypeTransformer implements RecordTransformer
{
public DataTypeTransformer(Schema schema) {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn()) {
- _dataTypes.put(fieldSpec.getName(),
PinotDataType.getPinotDataType(fieldSpec));
+ _dataTypes.put(fieldSpec.getName(),
PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index b1ffac3..a0bbb90 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -31,11 +31,10 @@ import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nonnull;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -54,13 +53,12 @@ import org.apache.pinot.spi.utils.JsonUtils;
*
*/
public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
-
public static final String FUNCTION_NAME = "jsonExtractScalar";
private static final ParseContext JSON_PARSER_CONTEXT =
JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
+
private TransformFunction _jsonFieldTransformFunction;
private String _jsonPath;
- private String _resultsType;
private Object _defaultValue = null;
private TransformResultMetadata _resultMetadata;
@@ -70,7 +68,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public void init(@Nonnull List<TransformFunction> arguments, @Nonnull
Map<String, DataSource> dataSourceMap) {
+ public void init(List<TransformFunction> arguments, Map<String, DataSource>
dataSourceMap) {
// Check that there are exactly 3 or 4 arguments
if (arguments.size() < 3 || arguments.size() > 4) {
throw new IllegalArgumentException(
@@ -84,41 +82,19 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
_jsonFieldTransformFunction = firstArgument;
_jsonPath = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
- _resultsType = ((LiteralTransformFunction)
arguments.get(2)).getLiteral().toUpperCase();
- boolean isSingleValue = !_resultsType.toUpperCase().endsWith("_ARRAY");
+ String resultsType = ((LiteralTransformFunction)
arguments.get(2)).getLiteral().toUpperCase();
+ boolean isSingleValue = !resultsType.endsWith("_ARRAY");
try {
- FieldSpec.DataType fieldType =
FieldSpec.DataType.valueOf(_resultsType.split("_ARRAY")[0]);
-
+ DataType dataType =
+ DataType.valueOf(isSingleValue ? resultsType :
resultsType.substring(0, resultsType.length() - 6));
if (arguments.size() == 4) {
- String defaultValue = ((LiteralTransformFunction)
arguments.get(3)).getLiteral();
- switch (fieldType) {
- case INT:
- _defaultValue = Double.valueOf(defaultValue).intValue();
- break;
- case LONG:
- _defaultValue = Double.valueOf(defaultValue).longValue();
- break;
- case FLOAT:
- _defaultValue = Double.valueOf(defaultValue).floatValue();
- break;
- case DOUBLE:
- _defaultValue = Double.valueOf(defaultValue);
- break;
- case BOOLEAN:
- case STRING:
- _defaultValue = defaultValue;
- break;
- case BYTES:
- throw new UnsupportedOperationException(String.format(
- "Unsupported results type: BYTES for 'jsonExtractScalar' Udf.
Supported types are:
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
- _resultsType));
- }
+ _defaultValue = dataType.convert(((LiteralTransformFunction)
arguments.get(3)).getLiteral());
}
- _resultMetadata = new TransformResultMetadata(fieldType, isSingleValue,
false);
+ _resultMetadata = new TransformResultMetadata(dataType, isSingleValue,
false);
} catch (Exception e) {
- throw new UnsupportedOperationException(String.format(
- "Unsupported results type: %s for 'jsonExtractScalar' Udf. Supported
types are:
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
- _resultsType));
+ throw new IllegalStateException(String.format(
+ "Unsupported results type: %s for jsonExtractScalar function.
Supported types are:
INT/LONG/FLOAT/DOUBLE/STRING/INT_ARRAY/LONG_ARRAY/FLOAT_ARRAY/DOUBLE_ARRAY/STRING_ARRAY",
+ resultsType));
}
}
@@ -128,7 +104,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public int[] transformToIntValuesSV(@Nonnull ProjectionBlock
projectionBlock) {
+ public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
final String[] stringValuesSV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final int[] results = new int[projectionBlock.getNumDocs()];
for (int i = 0; i < results.length; i++) {
@@ -151,7 +127,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public long[] transformToLongValuesSV(@Nonnull ProjectionBlock
projectionBlock) {
+ public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
final String[] stringValuesSV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final long[] results = new long[projectionBlock.getNumDocs()];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -175,7 +151,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public float[] transformToFloatValuesSV(@Nonnull ProjectionBlock
projectionBlock) {
+ public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
final String[] stringValuesSV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final float[] results = new float[projectionBlock.getNumDocs()];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -198,7 +174,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public double[] transformToDoubleValuesSV(@Nonnull ProjectionBlock
projectionBlock) {
+ public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
final String[] stringValuesSV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final double[] results = new double[projectionBlock.getNumDocs()];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -223,7 +199,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public String[] transformToStringValuesSV(@Nonnull ProjectionBlock
projectionBlock) {
+ public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
final String[] stringValuesSV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final String[] results = new String[projectionBlock.getNumDocs()];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -246,7 +222,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public int[][] transformToIntValuesMV(@Nonnull ProjectionBlock
projectionBlock) {
+ public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
final String[] stringValuesMV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final int[][] results = new int[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -264,7 +240,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public long[][] transformToLongValuesMV(@Nonnull ProjectionBlock
projectionBlock) {
+ public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
final String[] stringValuesMV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final long[][] results = new long[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -282,7 +258,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public float[][] transformToFloatValuesMV(@Nonnull ProjectionBlock
projectionBlock) {
+ public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
final String[] stringValuesMV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final float[][] results = new float[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -300,7 +276,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public double[][] transformToDoubleValuesMV(@Nonnull ProjectionBlock
projectionBlock) {
+ public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock)
{
final String[] stringValuesMV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final double[][] results = new double[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
@@ -318,7 +294,7 @@ public class JsonExtractScalarTransformFunction extends
BaseTransformFunction {
}
@Override
- public String[][] transformToStringValuesMV(@Nonnull ProjectionBlock
projectionBlock) {
+ public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock)
{
final String[] stringValuesMV =
_jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final String[][] results = new String[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
index eca96f6..d8d6151 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ScalarTransformFunctionWrapper.java
@@ -39,13 +39,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
public class ScalarTransformFunctionWrapper extends BaseTransformFunction {
private final String _name;
private final FunctionInvoker _functionInvoker;
+ private final PinotDataType _resultType;
+ private final TransformResultMetadata _resultMetadata;
private Object[] _arguments;
private int _numNonLiteralArguments;
private int[] _nonLiteralIndices;
private TransformFunction[] _nonLiteralFunctions;
private Object[][] _nonLiteralValues;
- private TransformResultMetadata _resultMetadata;
private int[] _intResults;
private float[] _floatResults;
@@ -70,6 +71,17 @@ public class ScalarTransformFunctionWrapper extends
BaseTransformFunction {
Preconditions.checkArgument(parameterTypes[i] != null, "Unsupported
parameter class: %s for method: %s",
parameterClasses[i], functionInfo.getMethod());
}
+ Class<?> resultClass = _functionInvoker.getResultClass();
+ PinotDataType resultType = FunctionUtils.getParameterType(resultClass);
+ if (resultType != null) {
+ _resultType = resultType;
+ _resultMetadata =
+ new TransformResultMetadata(FunctionUtils.getDataType(resultClass),
_resultType.isSingleValue(), false);
+ } else {
+ // Handle unrecognized result class with STRING
+ _resultType = PinotDataType.STRING;
+ _resultMetadata = new TransformResultMetadata(DataType.STRING, true,
false);
+ }
}
@Override
@@ -100,15 +112,6 @@ public class ScalarTransformFunctionWrapper extends
BaseTransformFunction {
}
}
_nonLiteralValues = new Object[_numNonLiteralArguments][];
-
- Class<?> resultClass = _functionInvoker.getResultClass();
- DataType resultDataType = FunctionUtils.getDataType(resultClass);
- // Handle unrecognized result class with STRING
- if (resultDataType == null) {
- resultDataType = DataType.STRING;
- }
- boolean isSingleValue = !resultClass.isArray();
- _resultMetadata = new TransformResultMetadata(resultDataType,
isSingleValue, false);
}
@Override
@@ -331,9 +334,9 @@ public class ScalarTransformFunctionWrapper extends
BaseTransformFunction {
private void getNonLiteralValues(ProjectionBlock projectionBlock) {
PinotDataType[] parameterTypes = _functionInvoker.getParameterTypes();
for (int i = 0; i < _numNonLiteralArguments; i++) {
- int index = _nonLiteralIndices[i];
+ PinotDataType parameterType = parameterTypes[_nonLiteralIndices[i]];
TransformFunction transformFunction = _nonLiteralFunctions[i];
- switch (parameterTypes[index]) {
+ switch (parameterType) {
case INTEGER:
_nonLiteralValues[i] =
ArrayUtils.toObject(transformFunction.transformToIntValuesSV(projectionBlock));
break;
@@ -352,23 +355,23 @@ public class ScalarTransformFunctionWrapper extends
BaseTransformFunction {
case BYTES:
_nonLiteralValues[i] =
transformFunction.transformToBytesValuesSV(projectionBlock);
break;
- case INTEGER_ARRAY:
+ case PRIMITIVE_INT_ARRAY:
_nonLiteralValues[i] =
transformFunction.transformToIntValuesMV(projectionBlock);
break;
- case LONG_ARRAY:
+ case PRIMITIVE_LONG_ARRAY:
_nonLiteralValues[i] =
transformFunction.transformToLongValuesMV(projectionBlock);
break;
- case FLOAT_ARRAY:
+ case PRIMITIVE_FLOAT_ARRAY:
_nonLiteralValues[i] =
transformFunction.transformToFloatValuesMV(projectionBlock);
break;
- case DOUBLE_ARRAY:
+ case PRIMITIVE_DOUBLE_ARRAY:
_nonLiteralValues[i] =
transformFunction.transformToDoubleValuesMV(projectionBlock);
break;
case STRING_ARRAY:
_nonLiteralValues[i] =
transformFunction.transformToStringValuesMV(projectionBlock);
break;
default:
- throw new IllegalStateException();
+ throw new IllegalStateException("Unsupported parameter type: " +
parameterType);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
index 0a046aa..085239a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/postaggregation/PostAggregationFunction.java
@@ -41,13 +41,19 @@ public class PostAggregationFunction {
Preconditions
.checkArgument(functionInfo != null, "Unsupported function: %s with %s
parameters", functionName, numArguments);
_functionInvoker = new FunctionInvoker(functionInfo);
+ Class<?>[] parameterClasses = _functionInvoker.getParameterClasses();
PinotDataType[] parameterTypes = _functionInvoker.getParameterTypes();
- Preconditions.checkArgument(numArguments == parameterTypes.length,
- "Wrong number of arguments for method: %s, expected: %s, actual: %s",
functionInfo.getMethod(),
- parameterTypes.length, numArguments);
+ int numParameters = parameterClasses.length;
+ Preconditions.checkArgument(numArguments == numParameters,
+ "Wrong number of arguments for method: %s, expected: %s, actual: %s",
functionInfo.getMethod(), numParameters,
+ numArguments);
+ for (int i = 0; i < numParameters; i++) {
+ Preconditions.checkArgument(parameterTypes[i] != null, "Unsupported
parameter class: %s for method: %s",
+ parameterClasses[i], functionInfo.getMethod());
+ }
_argumentTypes = new PinotDataType[numArguments];
for (int i = 0; i < numArguments; i++) {
- _argumentTypes[i] = PinotDataType.getPinotDataType(argumentTypes[i]);
+ _argumentTypes[i] =
PinotDataType.getPinotDataTypeForExecution(argumentTypes[i]);
}
ColumnDataType resultType =
FunctionUtils.getColumnDataType(_functionInvoker.getResultClass());
// Handle unrecognized result class with STRING
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 98d9624..d3a70c3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -35,7 +35,6 @@ import
org.apache.pinot.core.query.request.context.predicate.RangePredicate;
import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.BytesUtils;
/**
@@ -241,25 +240,9 @@ public class ColumnValueSegmentPruner implements
SegmentPruner {
private static Comparable convertValue(String stringValue, DataType
dataType) {
try {
- switch (dataType) {
- case INT:
- return Integer.valueOf(stringValue);
- case LONG:
- return Long.valueOf(stringValue);
- case FLOAT:
- return Float.valueOf(stringValue);
- case DOUBLE:
- return Double.valueOf(stringValue);
- case STRING:
- return stringValue;
- case BYTES:
- return BytesUtils.toByteArray(stringValue);
- default:
- throw new IllegalStateException();
- }
+ return dataType.convertInternal(stringValue);
} catch (Exception e) {
- throw new BadQueryRequestException(String.format("Cannot convert value:
'%s' to type: %s", stringValue, dataType),
- e);
+ throw new BadQueryRequestException(e);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index ec59d49..47ed2d8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -103,14 +103,15 @@ public class AggregationDataTableReducer implements
DataTableReducer {
}
Serializable[] finalResults = new Serializable[numAggregationFunctions];
for (int i = 0; i < numAggregationFunctions; i++) {
- finalResults[i] = AggregationFunctionUtils
-
.getSerializableValue(_aggregationFunctions[i].extractFinalResult(intermediateResults[i]));
+ AggregationFunction aggregationFunction = _aggregationFunctions[i];
+ finalResults[i] = aggregationFunction.getFinalResultColumnType()
+
.convert(aggregationFunction.extractFinalResult(intermediateResults[i]));
}
if (_responseFormatSql) {
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
} else {
-
brokerResponseNative.setAggregationResults(reduceToAggregationResults(finalResults,
dataSchema));
+
brokerResponseNative.setAggregationResults(reduceToAggregationResults(finalResults,
dataSchema.getColumnNames()));
}
}
@@ -120,26 +121,32 @@ public class AggregationDataTableReducer implements
DataTableReducer {
private ResultTable reduceToResultTable(Object[] finalResults) {
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext,
getPrePostAggregationDataSchema());
- DataSchema resultTableSchema =
postAggregationHandler.getResultDataSchema();
- Object[] resultRow = postAggregationHandler.getResult(finalResults);
- return new ResultTable(resultTableSchema,
Collections.singletonList(resultRow));
+ DataSchema dataSchema = postAggregationHandler.getResultDataSchema();
+ Object[] row = postAggregationHandler.getResult(finalResults);
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
+ for (int i = 0; i < numColumns; i++) {
+ row[i] = columnDataTypes[i].format(row[i]);
+ }
+ return new ResultTable(dataSchema, Collections.singletonList(row));
}
/**
* Sets aggregation results into AggregationResults
*/
- private List<AggregationResult> reduceToAggregationResults(Serializable[]
finalResults, DataSchema dataSchema) {
+ private List<AggregationResult> reduceToAggregationResults(Serializable[]
finalResults, String[] columnNames) {
int numAggregationFunctions = _aggregationFunctions.length;
List<AggregationResult> aggregationResults = new
ArrayList<>(numAggregationFunctions);
if (_preserveType) {
for (int i = 0; i < numAggregationFunctions; i++) {
- aggregationResults.add(new
AggregationResult(dataSchema.getColumnName(i), finalResults[i]));
+ aggregationResults.add(new AggregationResult(columnNames[i],
+
_aggregationFunctions[i].getFinalResultColumnType().format(finalResults[i])));
}
} else {
// Format the values into strings
for (int i = 0; i < numAggregationFunctions; i++) {
- aggregationResults.add(
- new AggregationResult(dataSchema.getColumnName(i),
AggregationFunctionUtils.formatValue(finalResults[i])));
+ aggregationResults.add(new AggregationResult(columnNames[i],
AggregationFunctionUtils
+
.formatValue(_aggregationFunctions[i].getFinalResultColumnType().format(finalResults[i]))));
}
}
return aggregationResults;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index ad3020a..97f9fa3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -30,12 +30,12 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.data.table.Record;
import
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.QueryOptions;
@@ -86,8 +86,8 @@ public class DistinctDataTableReducer implements
DataTableReducer {
// There's no way currently to get the data types of the
distinct columns for empty results
int numColumns = columns.length;
- DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[numColumns];
- Arrays.fill(columnDataTypes, DataSchema.ColumnDataType.STRING);
+ ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+ Arrays.fill(columnDataTypes, ColumnDataType.STRING);
brokerResponseNative
.setResultTable(new ResultTable(new DataSchema(columns,
columnDataTypes), Collections.emptyList()));
} else {
@@ -117,14 +117,14 @@ public class DistinctDataTableReducer implements
DataTableReducer {
private SelectionResults reduceToSelectionResult(DistinctTable
distinctTable) {
List<Serializable[]> rows = new ArrayList<>(distinctTable.size());
DataSchema dataSchema = distinctTable.getDataSchema();
- DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
Iterator<Record> iterator = distinctTable.getFinalResult();
while (iterator.hasNext()) {
Object[] values = iterator.next().getValues();
Serializable[] row = new Serializable[numColumns];
for (int i = 0; i < numColumns; i++) {
- row[i] = SelectionOperatorUtils.convertValueToType(values[i],
columnDataTypes[i]);
+ row[i] = columnDataTypes[i].convertAndFormat(values[i]);
}
rows.add(row);
}
@@ -134,14 +134,14 @@ public class DistinctDataTableReducer implements
DataTableReducer {
private ResultTable reduceToResultTable(DistinctTable distinctTable) {
List<Object[]> rows = new ArrayList<>(distinctTable.size());
DataSchema dataSchema = distinctTable.getDataSchema();
- DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
Iterator<Record> iterator = distinctTable.getFinalResult();
while (iterator.hasNext()) {
Object[] values = iterator.next().getValues();
Object[] row = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
- row[i] = SelectionOperatorUtils.convertValueToType(values[i],
columnDataTypes[i]);
+ row[i] = columnDataTypes[i].convertAndFormat(values[i]);
}
rows.add(row);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f0b7c29..cbf03cf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -198,6 +198,8 @@ public class GroupByDataTableReducer implements
DataTableReducer {
}
Iterator<Record> sortedIterator = indexedTable.iterator();
DataSchema prePostAggregationDataSchema =
getPrePostAggregationDataSchema(dataSchema);
+ ColumnDataType[] columnDataTypes =
prePostAggregationDataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
int limit = _queryContext.getLimit();
List<Object[]> rows = new ArrayList<>(limit);
@@ -206,13 +208,15 @@ public class GroupByDataTableReducer implements
DataTableReducer {
PostAggregationHandler postAggregationHandler =
new PostAggregationHandler(_queryContext,
prePostAggregationDataSchema);
- DataSchema resultTableSchema =
postAggregationHandler.getResultDataSchema();
FilterContext havingFilter = _queryContext.getHavingFilter();
if (havingFilter != null) {
HavingFilterHandler havingFilterHandler = new
HavingFilterHandler(havingFilter, postAggregationHandler);
while (rows.size() < limit && sortedIterator.hasNext()) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
+ for (int i = 0; i < numColumns; i++) {
+ row[i] = columnDataTypes[i].convert(row[i]);
+ }
if (havingFilterHandler.isMatch(row)) {
rows.add(row);
}
@@ -221,11 +225,25 @@ public class GroupByDataTableReducer implements
DataTableReducer {
for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
+ for (int j = 0; j < numColumns; j++) {
+ row[j] = columnDataTypes[j].convert(row[j]);
+ }
rows.add(row);
}
}
- rows.replaceAll(postAggregationHandler::getResult);
- brokerResponseNative.setResultTable(new ResultTable(resultTableSchema,
rows));
+ DataSchema resultDataSchema =
postAggregationHandler.getResultDataSchema();
+ ColumnDataType[] resultColumnDataTypes =
resultDataSchema.getColumnDataTypes();
+ int numResultColumns = resultColumnDataTypes.length;
+ int numResultRows = rows.size();
+ List<Object[]> resultRows = new ArrayList<>(numResultRows);
+ for (Object[] row : rows) {
+ Object[] resultRow = postAggregationHandler.getResult(row);
+ for (int i = 0; i < numResultColumns; i++) {
+ resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+ }
+ resultRows.add(resultRow);
+ }
+ brokerResponseNative.setResultTable(new ResultTable(resultDataSchema,
resultRows));
} else {
// PQL query with SQL group-by mode and response format
// NOTE: For PQL query, keep the order of columns as is (group-by
expressions followed by aggregations), no need
@@ -234,6 +252,9 @@ public class GroupByDataTableReducer implements
DataTableReducer {
for (int i = 0; i < limit && sortedIterator.hasNext(); i++) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
+ for (int j = 0; j < numColumns; j++) {
+ row[j] = columnDataTypes[j].convertAndFormat(row[j]);
+ }
rows.add(row);
}
brokerResponseNative.setResultTable(new
ResultTable(prePostAggregationDataSchema, rows));
@@ -246,8 +267,7 @@ public class GroupByDataTableReducer implements
DataTableReducer {
private void extractFinalAggregationResults(Object[] row) {
for (int i = 0; i < _numAggregationFunctions; i++) {
int valueIndex = i + _numGroupByExpressions;
- row[valueIndex] =
-
AggregationFunctionUtils.getSerializableValue(_aggregationFunctions[i].extractFinalResult(row[valueIndex]));
+ row[valueIndex] =
_aggregationFunctions[i].extractFinalResult(row[valueIndex]);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
index 08deed3..f56162d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/HavingFilterHandler.java
@@ -24,7 +24,6 @@ import
org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvide
import org.apache.pinot.core.query.request.context.FilterContext;
import org.apache.pinot.core.query.request.context.predicate.Predicate;
import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -133,28 +132,7 @@ public class HavingFilterHandler {
PredicateRowMatcher(Predicate predicate) {
_valueExtractor =
_postAggregationHandler.getValueExtractor(predicate.getLhs());
- switch (_valueExtractor.getColumnDataType()) {
- case INT:
- _valueType = DataType.INT;
- break;
- case LONG:
- _valueType = DataType.LONG;
- break;
- case FLOAT:
- _valueType = DataType.FLOAT;
- break;
- case DOUBLE:
- _valueType = DataType.DOUBLE;
- break;
- case STRING:
- _valueType = DataType.STRING;
- break;
- case BYTES:
- _valueType = DataType.BYTES;
- break;
- default:
- throw new IllegalStateException();
- }
+ _valueType = _valueExtractor.getColumnDataType().toDataType();
_predicateEvaluator =
PredicateEvaluatorProvider.getPredicateEvaluator(predicate, null, _valueType);
}
@@ -173,7 +151,7 @@ public class HavingFilterHandler {
case STRING:
return _predicateEvaluator.applySV((String) value);
case BYTES:
- return _predicateEvaluator.applySV(((ByteArray) value).getBytes());
+ return _predicateEvaluator.applySV((byte[]) value);
default:
throw new IllegalStateException();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index aec9d8f..9f3a5c9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -28,6 +28,7 @@ import java.util.PriorityQueue;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -90,11 +91,13 @@ public class SelectionOperatorService {
* @return flexible {@link Comparator} for selection rows.
*/
private Comparator<Object[]>
getTypeCompatibleComparator(List<OrderByExpressionContext> orderByExpressions) {
+ ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+
// Compare all single-value columns
int numOrderByExpressions = orderByExpressions.size();
List<Integer> valueIndexList = new ArrayList<>(numOrderByExpressions);
for (int i = 0; i < numOrderByExpressions; i++) {
- if (!_dataSchema.getColumnDataType(i).isArray()) {
+ if (!columnDataTypes[i].isArray()) {
valueIndexList.add(i);
}
}
@@ -107,7 +110,7 @@ public class SelectionOperatorService {
for (int i = 0; i < numValuesToCompare; i++) {
int valueIndex = valueIndexList.get(i);
valueIndices[i] = valueIndex;
- isNumber[i] = _dataSchema.getColumnDataType(valueIndex).isNumber();
+ isNumber[i] = columnDataTypes[valueIndex].isNumber();
multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? -1 : 1;
}
@@ -166,7 +169,7 @@ public class SelectionOperatorService {
LinkedList<Serializable[]> rowsInSelectionResults = new LinkedList<>();
int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
int numColumns = columnIndices.length;
- DataSchema.ColumnDataType[] columnDataTypes =
_dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
if (preserveType) {
while (_rows.size() > _offset) {
@@ -175,7 +178,7 @@ public class SelectionOperatorService {
Serializable[] extractedRow = new Serializable[numColumns];
for (int i = 0; i < numColumns; i++) {
int columnIndex = columnIndices[i];
- extractedRow[i] =
SelectionOperatorUtils.convertValueToType(row[columnIndex],
columnDataTypes[columnIndex]);
+ extractedRow[i] =
columnDataTypes[columnIndex].convertAndFormat(row[columnIndex]);
}
rowsInSelectionResults.addFirst(extractedRow);
}
@@ -204,32 +207,33 @@ public class SelectionOperatorService {
* @return {@link SelectionResults} object results.
*/
public ResultTable renderResultTableWithOrdering() {
- LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
int[] columnIndices =
SelectionOperatorUtils.getColumnIndices(_selectionColumns, _dataSchema);
int numColumns = columnIndices.length;
- DataSchema.ColumnDataType[] columnDataTypes =
_dataSchema.getColumnDataTypes();
-
- while (_rows.size() > _offset) {
- Object[] row = _rows.poll();
- assert row != null;
- Object[] extractedRow = new Object[numColumns];
- for (int i = 0; i < numColumns; i++) {
- int columnIndex = columnIndices[i];
- extractedRow[i] =
SelectionOperatorUtils.convertValueToType(row[columnIndex],
columnDataTypes[columnIndex]);
- }
- rowsInSelectionResults.addFirst(extractedRow);
- }
// Construct the result data schema
String[] columnNames = _dataSchema.getColumnNames();
+ ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
String[] resultColumnNames = new String[numColumns];
- DataSchema.ColumnDataType[] resultColumnDataTypes = new
DataSchema.ColumnDataType[numColumns];
+ ColumnDataType[] resultColumnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < numColumns; i++) {
int columnIndex = columnIndices[i];
resultColumnNames[i] = columnNames[columnIndex];
resultColumnDataTypes[i] = columnDataTypes[columnIndex];
}
DataSchema resultDataSchema = new DataSchema(resultColumnNames,
resultColumnDataTypes);
+
+ // Extract the result rows
+ LinkedList<Object[]> rowsInSelectionResults = new LinkedList<>();
+ while (_rows.size() > _offset) {
+ Object[] row = _rows.poll();
+ assert row != null;
+ Object[] extractedRow = new Object[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ extractedRow[i] =
resultColumnDataTypes[i].convertAndFormat(row[columnIndices[i]]);
+ }
+ rowsInSelectionResults.addFirst(extractedRow);
+ }
+
return new ResultTable(resultDataSchema, rowsInSelectionResults);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index f589806..ca73042 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -35,6 +35,7 @@ import java.util.Set;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -177,8 +178,8 @@ public class SelectionOperatorUtils {
*/
public static DataSchema getResultTableDataSchema(DataSchema dataSchema,
List<String> selectionColumns) {
int numColumns = selectionColumns.size();
- Map<String, DataSchema.ColumnDataType> columnNameToDataType = new
HashMap<>();
- DataSchema.ColumnDataType[] finalColumnDataTypes = new
DataSchema.ColumnDataType[numColumns];
+ Map<String, ColumnDataType> columnNameToDataType = new HashMap<>();
+ ColumnDataType[] finalColumnDataTypes = new ColumnDataType[numColumns];
for (int i = 0; i < dataSchema.size(); i++) {
columnNameToDataType.put(dataSchema.getColumnName(i),
dataSchema.getColumnDataType(i));
}
@@ -242,7 +243,7 @@ public class SelectionOperatorUtils {
dataTableBuilder.startRow();
for (int i = 0; i < numColumns; i++) {
Object columnValue = row[i];
- DataSchema.ColumnDataType columnDataType =
dataSchema.getColumnDataType(i);
+ ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
switch (columnDataType) {
// Single-value column
case INT:
@@ -335,7 +336,7 @@ public class SelectionOperatorUtils {
Object[] row = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
- DataSchema.ColumnDataType columnDataType =
dataSchema.getColumnDataType(i);
+ ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
switch (columnDataType) {
// Single-value column
case INT:
@@ -417,13 +418,13 @@ public class SelectionOperatorUtils {
List<String> selectionColumns, boolean preserveType) {
int numRows = rows.size();
List<Serializable[]> resultRows = new ArrayList<>(numRows);
- DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
if (preserveType) {
for (Object[] row : rows) {
Serializable[] resultRow = new Serializable[numColumns];
for (int i = 0; i < numColumns; i++) {
- resultRow[i] = convertValueToType(row[i], columnDataTypes[i]);
+ resultRow[i] = columnDataTypes[i].convertAndFormat(row[i]);
}
resultRows.add(resultRow);
}
@@ -452,12 +453,12 @@ public class SelectionOperatorUtils {
public static ResultTable renderResultTableWithoutOrdering(List<Object[]>
rows, DataSchema dataSchema) {
int numRows = rows.size();
List<Object[]> resultRows = new ArrayList<>(numRows);
- DataSchema.ColumnDataType[] columnDataTypes =
dataSchema.getColumnDataTypes();
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
int numColumns = columnDataTypes.length;
for (Object[] row : rows) {
Object[] resultRow = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
- resultRow[i] = convertValueToType(row[i], columnDataTypes[i]);
+ resultRow[i] = columnDataTypes[i].convertAndFormat(row[i]);
}
resultRows.add(resultRow);
}
@@ -491,81 +492,13 @@ public class SelectionOperatorUtils {
}
/**
- * Converts a value into the given data type. (Broker side)
- * <p>Actual value type can be different with data type passed in, but they
must be type compatible.
- */
- public static Serializable convertValueToType(Object value,
DataSchema.ColumnDataType dataType) {
- switch (dataType) {
- // Single-value column
- case INT:
- return ((Number) value).intValue();
- case LONG:
- return ((Number) value).longValue();
- case FLOAT:
- return ((Number) value).floatValue();
- case DOUBLE:
- return ((Number) value).doubleValue();
- // NOTE: Return hex-encoded String for BYTES columns for
backward-compatibility
- // TODO: Revisit to see whether we should return byte[] instead
- case BYTES:
- return ((ByteArray) value).toHexString();
-
- // Multi-value column
- case LONG_ARRAY:
- // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY
- if (value instanceof int[]) {
- int[] ints = (int[]) value;
- int length = ints.length;
- long[] longs = new long[length];
- for (int i = 0; i < length; i++) {
- longs[i] = ints[i];
- }
- return longs;
- } else {
- return (long[]) value;
- }
- case DOUBLE_ARRAY:
- // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and
DOUBLE_ARRAY
- if (value instanceof int[]) {
- int[] ints = (int[]) value;
- int length = ints.length;
- double[] doubles = new double[length];
- for (int i = 0; i < length; i++) {
- doubles[i] = ints[i];
- }
- return doubles;
- } else if (value instanceof long[]) {
- long[] longs = (long[]) value;
- int length = longs.length;
- double[] doubles = new double[length];
- for (int i = 0; i < length; i++) {
- doubles[i] = longs[i];
- }
- return doubles;
- } else if (value instanceof float[]) {
- float[] floats = (float[]) value;
- int length = floats.length;
- double[] doubles = new double[length];
- for (int i = 0; i < length; i++) {
- doubles[i] = floats[i];
- }
- return doubles;
- } else {
- return (double[]) value;
- }
-
- default:
- // For STRING, INT_ARRAY, FLOAT_ARRAY and STRING_ARRAY, no need to
format
- return (Serializable) value;
- }
- }
-
- /**
+ * Deprecated because this method is only used to construct the PQL
response, and PQL is already deprecated.
* Formats a value into a {@code String} (single-value column) or {@code
String[]} (multi-value column) based on the
* data type. (Broker side)
* <p>Actual value type can be different with data type passed in, but they
must be type compatible.
*/
- public static Serializable getFormattedValue(Object value,
DataSchema.ColumnDataType dataType) {
+ @Deprecated
+ public static Serializable getFormattedValue(Object value, ColumnDataType
dataType) {
switch (dataType) {
// Single-value column
case INT:
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
index 9870242..559b201 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/HavingFilterHandlerTest.java
@@ -22,7 +22,6 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
-import org.apache.pinot.spi.utils.ByteArray;
import org.testng.annotations.Test;
import static org.testng.Assert.assertFalse;
@@ -84,20 +83,13 @@ public class HavingFilterHandlerTest {
PostAggregationHandler postAggregationHandler = new
PostAggregationHandler(queryContext, dataSchema);
HavingFilterHandler havingFilterHandler =
new HavingFilterHandler(queryContext.getHavingFilter(),
postAggregationHandler);
- assertTrue(
- havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{10, 11L, 10.5f, 10.5, "11",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{11, 10L, 10.5f, 10.5, "11",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{11, 11L, 10.0f, 10.5, "11",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.0, "11",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "10",
new ByteArray(new byte[]{17}), 5}));
- assertFalse(
- havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f, 10.5, "11",
new ByteArray(new byte[]{16}), 5}));
+ assertTrue(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f,
10.5, "11", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{10, 11L, 10.5f,
10.5, "11", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{11, 10L, 10.5f,
10.5, "11", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.0f,
10.5, "11", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f,
10.0, "11", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f,
10.5, "10", new byte[]{17}, 5}));
+ assertFalse(havingFilterHandler.isMatch(new Object[]{11, 11L, 10.5f,
10.5, "11", new byte[]{16}, 5}));
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
index 3011f70..9a0757b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
@@ -31,12 +31,16 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.geospatial.GeometryUtils;
import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.geospatial.transform.function.ScalarFunctions;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -52,7 +56,7 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
@@ -70,6 +74,7 @@ import static org.testng.AssertJUnit.assertNotNull;
/**
* Queries test for ST_UNION queries.
*/
+@SuppressWarnings("rawtypes")
public class StUnionQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"StUnionQueriesTest");
private static final String RAW_TABLE_NAME = "testTable";
@@ -158,22 +163,50 @@ public class StUnionQueriesTest extends BaseQueriesTest {
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, NUM_RECORDS,
NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
assertNotNull(aggregationResult);
-
+ assertEquals(aggregationResult.size(), 1);
assertEquals(aggregationResult.get(0), _intermediateResult);
// Inter segments
String[] expectedResults = new String[1];
- expectedResults[0] = new ByteArray(_expectedResults).toHexString();
+ expectedResults[0] = BytesUtils.toHexString(_expectedResults);
BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
QueriesTestUtils
.testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * NUM_RECORDS, 4 * NUM_RECORDS,
expectedResults);
- brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
- QueriesTestUtils
- .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * NUM_RECORDS, 4 * NUM_RECORDS,
- expectedResults);
+ }
+
+ @Test
+ public void testPostAggregation() {
+ String query =
+ "SELECT ST_AS_TEXT(ST_UNION(pointColumn)),
TO_GEOMETRY(ST_UNION(pointColumn)),
TO_SPHERICAL_GEOGRAPHY(ST_UNION(pointColumn)),
ST_AS_TEXT(TO_SPHERICAL_GEOGRAPHY(ST_UNION(pointColumn))) FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator)
operator).nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 4);
+ for (Object value : aggregationResult) {
+ assertEquals(value, _intermediateResult);
+ }
+
+ // Inter segment
+ BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema = new DataSchema(
+ new String[]{"st_as_text(st_union(pointColumn))",
"to_geometry(st_union(pointColumn))",
"to_spherical_geography(st_union(pointColumn))",
"st_as_text(to_spherical_geography(st_union(pointColumn)))"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES,
ColumnDataType.BYTES, ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0), new
Object[]{ScalarFunctions.stAsText(_expectedResults), BytesUtils.toHexString(
+ ScalarFunctions.toGeometry(_expectedResults)), BytesUtils.toHexString(
+ ScalarFunctions.toSphericalGeography(_expectedResults)),
ScalarFunctions.stAsText(
+ ScalarFunctions.toSphericalGeography(_expectedResults))});
}
@Test
@@ -186,18 +219,15 @@ public class StUnionQueriesTest extends BaseQueriesTest {
IntermediateResultsBlock resultsBlock = ((AggregationOperator)
operator).nextBlock();
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
0, 0, 0, NUM_RECORDS);
List<Object> aggregationResult = resultsBlock.getAggregationResult();
-
assertNotNull(aggregationResult);
-
+ assertEquals(aggregationResult.size(), 1);
assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT);
// Inter segments
String[] expectedResults = new String[1];
- expectedResults[0] = new
ByteArray(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT)).toHexString();
+ expectedResults[0] =
BytesUtils.toHexString(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT));
BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0,
0, 4 * NUM_RECORDS, expectedResults);
- brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
- QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0,
0, 4 * NUM_RECORDS, expectedResults);
}
@Test
@@ -244,8 +274,7 @@ public class StUnionQueriesTest extends BaseQueriesTest {
assertEquals(group.size(), 1);
int key = Integer.parseInt(group.get(0));
assertTrue(_values.containsKey(key));
- assertEquals(groupByResult.getValue(),
- new
ByteArray(GeometrySerializer.serialize(_values.get(key))).toHexString());
+ assertEquals(groupByResult.getValue(),
BytesUtils.toHexString(GeometrySerializer.serialize(_values.get(key))));
}
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
index 7590d3f..d3ba43a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SumPrecisionQueriesTest.java
@@ -29,7 +29,10 @@ import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -166,12 +169,17 @@ public class SumPrecisionQueriesTest extends
BaseQueriesTest {
// Inter segment
BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
- List<Object[]> rows = brokerResponse.getResultTable().getRows();
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema = new DataSchema(
+ new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)",
"sumprecision(floatColumn)", "sumprecision(doubleColumn)",
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
assertEquals(rows.size(), 1);
- BigDecimal intSum = _intSum.multiply(FOUR);
- BigDecimal longSum = _longSum.multiply(FOUR);
- BigDecimal floatSum = _floatSum.multiply(FOUR);
- BigDecimal doubleSum = _doubleSum.multiply(FOUR);
+ String intSum = _intSum.multiply(FOUR).toString();
+ String longSum = _longSum.multiply(FOUR).toString();
+ String floatSum = _floatSum.multiply(FOUR).toString();
+ String doubleSum = _doubleSum.multiply(FOUR).toString();
assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum,
doubleSum, doubleSum, doubleSum});
}
@@ -195,13 +203,18 @@ public class SumPrecisionQueriesTest extends
BaseQueriesTest {
// Inter segment
BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
- List<Object[]> rows = brokerResponse.getResultTable().getRows();
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema = new DataSchema(
+ new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)",
"sumprecision(floatColumn)", "sumprecision(doubleColumn)",
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
assertEquals(rows.size(), 1);
MathContext mathContext = new MathContext(6, RoundingMode.HALF_EVEN);
- BigDecimal intSum = _intSum.multiply(FOUR).round(mathContext);
- BigDecimal longSum = _longSum.multiply(FOUR).round(mathContext);
- BigDecimal floatSum = _floatSum.multiply(FOUR).round(mathContext);
- BigDecimal doubleSum = _doubleSum.multiply(FOUR).round(mathContext);
+ String intSum = _intSum.multiply(FOUR).round(mathContext).toString();
+ String longSum = _longSum.multiply(FOUR).round(mathContext).toString();
+ String floatSum = _floatSum.multiply(FOUR).round(mathContext).toString();
+ String doubleSum = _doubleSum.multiply(FOUR).round(mathContext).toString();
assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum,
doubleSum, doubleSum, doubleSum});
}
@@ -225,16 +238,45 @@ public class SumPrecisionQueriesTest extends
BaseQueriesTest {
// Inter segment
BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
- List<Object[]> rows = brokerResponse.getResultTable().getRows();
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema = new DataSchema(
+ new String[]{"sumprecision(intColumn)", "sumprecision(longColumn)",
"sumprecision(floatColumn)", "sumprecision(doubleColumn)",
"sumprecision(stringColumn)", "sumprecision(bytesColumn)"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.STRING,
ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
assertEquals(rows.size(), 1);
MathContext mathContext = new MathContext(10, RoundingMode.HALF_EVEN);
- BigDecimal intSum = _intSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN);
- BigDecimal longSum =
_longSum.multiply(FOUR).round(mathContext).setScale(3, RoundingMode.HALF_EVEN);
- BigDecimal floatSum =
_floatSum.multiply(FOUR).round(mathContext).setScale(3, RoundingMode.HALF_EVEN);
- BigDecimal doubleSum =
_doubleSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN);
+ String intSum = _intSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN).toString();
+ String longSum = _longSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN).toString();
+ String floatSum = _floatSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN).toString();
+ String doubleSum =
_doubleSum.multiply(FOUR).round(mathContext).setScale(3,
RoundingMode.HALF_EVEN).toString();
assertEquals(rows.get(0), new Object[]{intSum, longSum, floatSum,
doubleSum, doubleSum, doubleSum});
}
+ @Test
+ public void testPostAggregation() {
+ String query = "SELECT SUM_PRECISION(intColumn) * 2 FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForSqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ List<Object> aggregationResult = ((AggregationOperator)
operator).nextBlock().getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+ assertEquals(aggregationResult.get(0), _intSum);
+
+ // Inter segment
+ BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query);
+ ResultTable resultTable = brokerResponse.getResultTable();
+ DataSchema expectedDataSchema = new DataSchema(new
String[]{"times(sum_precision(intColumn),'2')"},
+ new ColumnDataType[]{ColumnDataType.DOUBLE});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 1);
+ double expectedResult = _intSum.multiply(FOUR).doubleValue() * 2;
+ assertEquals(rows.get(0), new Object[]{expectedResult});
+ }
+
@AfterClass
public void tearDown()
throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index a393ca5..8afeeed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -42,22 +42,19 @@ import org.apache.pinot.spi.utils.JsonUtils;
public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable
{
private static final int DEFAULT_MAX_LENGTH = 512;
- // TODO: revisit to see if we allow 0-length byte array
- private static final byte[] NULL_BYTE_ARRAY_VALUE = new byte[0];
-
public static final Integer DEFAULT_DIMENSION_NULL_VALUE_OF_INT =
Integer.MIN_VALUE;
public static final Long DEFAULT_DIMENSION_NULL_VALUE_OF_LONG =
Long.MIN_VALUE;
public static final Float DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT =
Float.NEGATIVE_INFINITY;
public static final Double DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE =
Double.NEGATIVE_INFINITY;
public static final String DEFAULT_DIMENSION_NULL_VALUE_OF_STRING = "null";
- public static final byte[] DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES =
NULL_BYTE_ARRAY_VALUE;
+ public static final byte[] DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES = new
byte[0];
public static final Integer DEFAULT_METRIC_NULL_VALUE_OF_INT = 0;
public static final Long DEFAULT_METRIC_NULL_VALUE_OF_LONG = 0L;
public static final Float DEFAULT_METRIC_NULL_VALUE_OF_FLOAT = 0.0F;
public static final Double DEFAULT_METRIC_NULL_VALUE_OF_DOUBLE = 0.0D;
public static final String DEFAULT_METRIC_NULL_VALUE_OF_STRING = "null";
- public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES =
NULL_BYTE_ARRAY_VALUE;
+ public static final byte[] DEFAULT_METRIC_NULL_VALUE_OF_BYTES = new byte[0];
protected String _name;
protected DataType _dataType;
@@ -207,8 +204,7 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
case BYTES:
return DEFAULT_METRIC_NULL_VALUE_OF_BYTES;
default:
- throw new UnsupportedOperationException(
- "Unknown default null value for metric field of data type: "
+ dataType);
+ throw new IllegalStateException("Unsupported metric data type: "
+ dataType);
}
case DIMENSION:
case TIME:
@@ -227,11 +223,10 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
case BYTES:
return DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES;
default:
- throw new UnsupportedOperationException(
- "Unknown default null value for dimension/time field of data
type: " + dataType);
+ throw new IllegalStateException("Unsupported dimension/time data
type: " + dataType);
}
default:
- throw new UnsupportedOperationException("Unsupported field type: " +
fieldType);
+ throw new IllegalStateException("Unsupported field type: " +
fieldType);
}
}
}
@@ -276,11 +271,29 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
protected void appendDefaultNullValue(ObjectNode jsonNode) {
assert _defaultNullValue != null;
+ String key = "defaultNullValue";
if (!_defaultNullValue.equals(getDefaultNullValue(getFieldType(),
_dataType, null))) {
- if (_defaultNullValue instanceof Number) {
- jsonNode.set("defaultNullValue",
JsonUtils.objectToJsonNode(_defaultNullValue));
- } else {
- jsonNode.put("defaultNullValue", getStringValue(_defaultNullValue));
+ switch (_dataType) {
+ case INT:
+ jsonNode.put(key, (Integer) _defaultNullValue);
+ break;
+ case LONG:
+ jsonNode.put(key, (Long) _defaultNullValue);
+ break;
+ case FLOAT:
+ jsonNode.put(key, (Float) _defaultNullValue);
+ break;
+ case DOUBLE:
+ jsonNode.put(key, (Double) _defaultNullValue);
+ break;
+ case STRING:
+ jsonNode.put(key, (String) _defaultNullValue);
+ break;
+ case BYTES:
+ jsonNode.put(key, BytesUtils.toHexString((byte[])
_defaultNullValue));
+ break;
+ default:
+ throw new IllegalStateException("Unsupported data type: " + this);
}
}
}
@@ -335,6 +348,7 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
/**
* The <code>DataType</code> enum is used to demonstrate the data type of a
field.
*/
+ @SuppressWarnings("rawtypes")
public enum DataType {
// LIST is for complex lists which is different from multi-value column of
primitives
// STRUCT, MAP and LIST are composable to form a COMPLEX field
@@ -366,28 +380,6 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
}
/**
- * Converts the given string value to the data type.
- */
- public Object convert(String value) {
- switch (this) {
- case INT:
- return Integer.valueOf(value);
- case LONG:
- return Long.valueOf(value);
- case FLOAT:
- return Float.valueOf(value);
- case DOUBLE:
- return Double.valueOf(value);
- case STRING:
- return value;
- case BYTES:
- return BytesUtils.toBytes(value);
- default:
- throw new UnsupportedOperationException("Unsupported data type: " +
this);
- }
- }
-
- /**
* Check if the data type is for fixed width data (INT, LONG, FLOAT,
DOUBLE)
* or variable width data (STRING, BYTES)
*/
@@ -398,6 +390,58 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
public boolean isNumeric() {
return this == INT || this == LONG || this == FLOAT || this == DOUBLE;
}
+
+ /**
+ * Converts the given string value to the data type. Returns byte[] for
BYTES.
+ */
+ public Object convert(String value) {
+ try {
+ switch (this) {
+ case INT:
+ return Integer.valueOf(value);
+ case LONG:
+ return Long.valueOf(value);
+ case FLOAT:
+ return Float.valueOf(value);
+ case DOUBLE:
+ return Double.valueOf(value);
+ case STRING:
+ return value;
+ case BYTES:
+ return BytesUtils.toBytes(value);
+ default:
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Cannot convert
value: '%s' to type: %s", value, this));
+ }
+ }
+
+ /**
+ * Converts the given string value to the data type. Returns ByteArray for
BYTES.
+ */
+ public Comparable convertInternal(String value) {
+ try {
+ switch (this) {
+ case INT:
+ return Integer.valueOf(value);
+ case LONG:
+ return Long.valueOf(value);
+ case FLOAT:
+ return Float.valueOf(value);
+ case DOUBLE:
+ return Double.valueOf(value);
+ case STRING:
+ return value;
+ case BYTES:
+ return BytesUtils.toByteArray(value);
+ default:
+ throw new IllegalStateException();
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Cannot convert
value: '%s' to type: %s", value, this));
+ }
+ }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]