This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 916d807 reduce allocation rate in LookupTransformFunction (#8204)
916d807 is described below
commit 916d807c8f67b32c1a430692f74134c9c976c33d
Author: Richard Startin <[email protected]>
AuthorDate: Wed Feb 16 07:12:55 2022 +0000
reduce allocation rate in LookupTransformFunction (#8204)
* reduce allocation rate in LookupTransformFunction
* default values for array types
---
.../function/LookupTransformFunction.java | 308 +++++++++++++--------
1 file changed, 199 insertions(+), 109 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
index 8238e7a..d0debfa 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
@@ -20,10 +20,9 @@ package org.apache.pinot.core.operator.transform.function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.ArrayUtils;
+import javax.annotation.Nullable;
import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
@@ -66,8 +65,13 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class LookupTransformFunction extends BaseTransformFunction {
public static final String FUNCTION_NAME = "lookUp";
- // Lookup parameters
- private String _dimTableName;
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ private static final int[] EMPTY_INTS = new int[0];
+ private static final long[] EMPTY_LONGS = new long[0];
+ private static final float[] EMPTY_FLOATS = new float[0];
+ private static final double[] EMPTY_DOUBLES = new double[0];
+ private static final String[] EMPTY_STRINGS = new String[0];
+
private String _dimColumnName;
private final List<String> _joinKeys = new ArrayList<>();
private final List<FieldSpec> _joinValueFieldSpecs = new ArrayList<>();
@@ -76,6 +80,11 @@ public class LookupTransformFunction extends
BaseTransformFunction {
private DimensionTableDataManager _dataManager;
private FieldSpec _lookupColumnFieldSpec;
+ private int _nullIntValue;
+ private long _nullLongValue;
+ private float _nullFloatValue;
+ private double _nullDoubleValue;
+
@Override
public String getName() {
return FUNCTION_NAME;
@@ -93,7 +102,8 @@ public class LookupTransformFunction extends
BaseTransformFunction {
TransformFunction dimTableNameFunction = arguments.get(0);
Preconditions.checkArgument(dimTableNameFunction instanceof
LiteralTransformFunction,
"First argument must be a literal(string) representing the dimension
table name");
- _dimTableName =
+ // Lookup parameters
+ String dimTableName =
TableNameBuilder.OFFLINE.tableNameWithType(((LiteralTransformFunction)
dimTableNameFunction).getLiteral());
TransformFunction dimColumnFunction = arguments.get(1);
@@ -117,24 +127,32 @@ public class LookupTransformFunction extends
BaseTransformFunction {
}
// Validate lookup table and relevant columns
- _dataManager =
DimensionTableDataManager.getInstanceByTableName(_dimTableName);
- Preconditions.checkArgument(_dataManager != null, "Dimension table does
not exist: %s", _dimTableName);
+ _dataManager =
DimensionTableDataManager.getInstanceByTableName(dimTableName);
+ Preconditions.checkArgument(_dataManager != null, "Dimension table does
not exist: %s", dimTableName);
_lookupColumnFieldSpec = _dataManager.getColumnFieldSpec(_dimColumnName);
Preconditions
- .checkArgument(_lookupColumnFieldSpec != null, "Column does not exist
in dimension table: %s:%s", _dimTableName,
+ .checkArgument(_lookupColumnFieldSpec != null, "Column does not exist
in dimension table: %s:%s", dimTableName,
_dimColumnName);
for (String joinKey : _joinKeys) {
FieldSpec pkColumnSpec = _dataManager.getColumnFieldSpec(joinKey);
Preconditions.checkArgument(pkColumnSpec != null, "Primary key column
doesn't exist in dimension table: %s:%s",
- _dimTableName, joinKey);
+ dimTableName, joinKey);
_joinValueFieldSpecs.add(pkColumnSpec);
}
List<String> tablePrimaryKeyColumns = _dataManager.getPrimaryKeyColumns();
Preconditions.checkArgument(_joinKeys.equals(tablePrimaryKeyColumns),
"Provided join keys (%s) must be the same as table primary keys: %s",
_joinKeys, tablePrimaryKeyColumns);
+
+ Object defaultNullValue = _lookupColumnFieldSpec.getDefaultNullValue();
+ if (defaultNullValue instanceof Number) {
+ _nullIntValue = ((Number) defaultNullValue).intValue();
+ _nullLongValue = ((Number) defaultNullValue).longValue();
+ _nullFloatValue = ((Number) defaultNullValue).floatValue();
+ _nullDoubleValue = ((Number) defaultNullValue).intValue();
+ }
}
@Override
@@ -143,191 +161,263 @@ public class LookupTransformFunction extends
BaseTransformFunction {
_lookupColumnFieldSpec.isSingleValueField(), false);
}
- private Object[] lookup(ProjectionBlock projectionBlock) {
+ @FunctionalInterface
+ private interface ValueAcceptor {
+ void accept(int index, @Nullable Object value);
+ }
+
+ private void lookup(ProjectionBlock projectionBlock, ValueAcceptor
valueAcceptor) {
int numPkColumns = _joinKeys.size();
int numDocuments = projectionBlock.getNumDocs();
- Object[][] pkColumns = new Object[numPkColumns][];
+ Object[] pkColumns = new Object[numPkColumns];
for (int c = 0; c < numPkColumns; c++) {
DataType storedType =
_joinValueFieldSpecs.get(c).getDataType().getStoredType();
TransformFunction tf = _joinValueFunctions.get(c);
switch (storedType) {
case INT:
- pkColumns[c] =
ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+ pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
break;
case LONG:
- pkColumns[c] =
ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+ pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
break;
case FLOAT:
- pkColumns[c] =
ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+ pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
break;
case DOUBLE:
- pkColumns[c] =
ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+ pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
break;
case STRING:
pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
break;
case BYTES:
- byte[][] primitiveValues =
tf.transformToBytesValuesSV(projectionBlock);
- pkColumns[c] = new ByteArray[numDocuments];
- for (int i = 0; i < numDocuments; i++) {
- pkColumns[c][i] = new ByteArray(primitiveValues[i]);
- }
+ pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
break;
default:
throw new IllegalStateException("Unknown column type for primary
key");
}
}
- Object[] resultSet = new Object[numDocuments];
Object[] pkValues = new Object[numPkColumns];
+ PrimaryKey primaryKey = new PrimaryKey(pkValues);
for (int i = 0; i < numDocuments; i++) {
// prepare pk
for (int c = 0; c < numPkColumns; c++) {
- pkValues[c] = pkColumns[c][i];
+ if (pkColumns[c] instanceof int[]) {
+ pkValues[c] = ((int[]) pkColumns[c])[i];
+ } else if (pkColumns[c] instanceof long[]) {
+ pkValues[c] = ((long[]) pkColumns[c])[i];
+ } else if (pkColumns[c] instanceof String[]) {
+ pkValues[c] = ((String[]) pkColumns[c])[i];
+ } else if (pkColumns[c] instanceof float[]) {
+ pkValues[c] = ((float[]) pkColumns[c])[i];
+ } else if (pkColumns[c] instanceof double[]) {
+ pkValues[c] = ((double[]) pkColumns[c])[i];
+ } else if (pkColumns[c] instanceof byte[][]) {
+ pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+ }
}
// lookup
- GenericRow row = _dataManager.lookupRowByPrimaryKey(new
PrimaryKey(pkValues));
- if (row != null) {
- resultSet[i] = row.getValue(_dimColumnName);
- }
+ GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+ Object value = row == null ? null : row.getValue(_dimColumnName);
+ valueAcceptor.accept(i, value);
}
- return resultSet;
}
@Override
public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- int[] resultSet = new int[lookupObjects.length];
- Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).intValue());
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = ((Number) lookupObjects[i]).intValue();
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+ _intValuesSV = new int[numDocs];
}
- return resultSet;
+ lookup(projectionBlock, this::setIntSV);
+ return _intValuesSV;
}
@Override
public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- long[] resultSet = new long[lookupObjects.length];
- Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).longValue());
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = ((Number) lookupObjects[i]).longValue();
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+ _longValuesSV = new long[numDocs];
}
- return resultSet;
+ lookup(projectionBlock, this::setLongSV);
+ return _longValuesSV;
}
@Override
public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- float[] resultSet = new float[lookupObjects.length];
- Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = ((Number) lookupObjects[i]).floatValue();
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+ _floatValuesSV = new float[numDocs];
}
- return resultSet;
+ lookup(projectionBlock, this::setFloatSV);
+ return _floatValuesSV;
}
@Override
public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- double[] resultSet = new double[lookupObjects.length];
- Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+ _doubleValuesSV = new double[numDocs];
}
- return resultSet;
+ lookup(projectionBlock, this::setDoubleSV);
+ return _doubleValuesSV;
}
@Override
public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- String[] resultSet = new String[lookupObjects.length];
- Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = lookupObjects[i].toString();
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+ _stringValuesSV = new String[numDocs];
}
- return resultSet;
+ lookup(projectionBlock, this::setStringSV);
+ return _stringValuesSV;
}
@Override
public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- byte[][] resultSet = new byte[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (byte[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+ _byteValuesSV = new byte[numDocs][];
}
- return resultSet;
+ lookup(projectionBlock, this::setBytesSV);
+ return _byteValuesSV;
}
@Override
public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- int[][] resultSet = new int[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (int[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+ _intValuesMV = new int[numDocs][];
}
- return resultSet;
+ lookup(projectionBlock, this::setIntMV);
+ return _intValuesMV;
}
@Override
public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- long[][] resultSet = new long[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (long[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+ _longValuesMV = new long[numDocs][];
}
- return resultSet;
+ lookup(projectionBlock, this::setLongMV);
+ return _longValuesMV;
}
@Override
public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
- Object[] lookupObjects = lookup(projectionBlock);
- float[][] resultSet = new float[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (float[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+ _floatValuesMV = new float[numDocs][];
}
- return resultSet;
+ lookup(projectionBlock, this::setFloatMV);
+ return _floatValuesMV;
}
@Override
public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock)
{
- Object[] lookupObjects = lookup(projectionBlock);
- double[][] resultSet = new double[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (double[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+ _doubleValuesMV = new double[numDocs][];
}
- return resultSet;
+ lookup(projectionBlock, this::setDoubleMV);
+ return _doubleValuesMV;
}
@Override
public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock)
{
- Object[] lookupObjects = lookup(projectionBlock);
- String[][] resultSet = new String[lookupObjects.length][0];
- for (int i = 0; i < lookupObjects.length; i++) {
- if (lookupObjects[i] != null) {
- resultSet[i] = (String[]) lookupObjects[i];
- }
+ int numDocs = projectionBlock.getNumDocs();
+ if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+ _stringValuesMV = new String[numDocs][];
+ }
+ lookup(projectionBlock, this::setStringMV);
+ return _stringValuesMV;
+ }
+
+ private void setIntSV(int index, Object value) {
+ if (value instanceof Number) {
+ _intValuesSV[index] = ((Number) value).intValue();
+ } else {
+ _intValuesSV[index] = _nullIntValue;
+ }
+ }
+
+ private void setLongSV(int index, Object value) {
+ if (value instanceof Number) {
+ _longValuesSV[index] = ((Number) value).longValue();
+ } else {
+ _longValuesSV[index] = _nullLongValue;
+ }
+ }
+
+ private void setFloatSV(int index, Object value) {
+ if (value instanceof Number) {
+ _floatValuesSV[index] = ((Number) value).floatValue();
+ } else {
+ _floatValuesSV[index] = _nullFloatValue;
+ }
+ }
+
+ private void setDoubleSV(int index, Object value) {
+ if (value instanceof Number) {
+ _doubleValuesSV[index] = ((Number) value).doubleValue();
+ } else {
+ _doubleValuesSV[index] = _nullDoubleValue;
+ }
+ }
+
+ private void setStringSV(int index, Object value) {
+ if (value != null) {
+ _stringValuesSV[index] = String.valueOf(value);
+ } else {
+ _stringValuesSV[index] =
_lookupColumnFieldSpec.getDefaultNullValueString();
+ }
+ }
+
+ private void setBytesSV(int index, Object value) {
+ if (value instanceof byte[]) {
+ _byteValuesSV[index] = (byte[]) value;
+ } else {
+ _byteValuesSV[index] = EMPTY_BYTES;
+ }
+ }
+
+ private void setIntMV(int index, Object value) {
+ if (value instanceof int[]) {
+ _intValuesMV[index] = (int[]) value;
+ } else {
+ _intValuesMV[index] = EMPTY_INTS;
+ }
+ }
+
+ private void setLongMV(int index, Object value) {
+ if (value instanceof long[]) {
+ _longValuesMV[index] = (long[]) value;
+ } else {
+ _longValuesMV[index] = EMPTY_LONGS;
+ }
+ }
+
+ private void setFloatMV(int index, Object value) {
+ if (value instanceof float[]) {
+ _floatValuesMV[index] = (float[]) value;
+ } else {
+ _floatValuesMV[index] = EMPTY_FLOATS;
+ }
+ }
+
+ private void setDoubleMV(int index, Object value) {
+ if (value instanceof double[]) {
+ _doubleValuesMV[index] = (double[]) value;
+ } else {
+ _doubleValuesMV[index] = EMPTY_DOUBLES;
+ }
+ }
+
+ private void setStringMV(int index, Object value) {
+ if (value instanceof String[]) {
+ _stringValuesMV[index] = (String[]) value;
+ } else {
+ _stringValuesMV[index] = EMPTY_STRINGS;
}
- return resultSet;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]