This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 916d807  reduce allocation rate in LookupTransformFunction (#8204)
916d807 is described below

commit 916d807c8f67b32c1a430692f74134c9c976c33d
Author: Richard Startin <[email protected]>
AuthorDate: Wed Feb 16 07:12:55 2022 +0000

    reduce allocation rate in LookupTransformFunction (#8204)
    
    * reduce allocation rate in LookupTransformFunction
    
    * default values for array types
---
 .../function/LookupTransformFunction.java          | 308 +++++++++++++--------
 1 file changed, 199 insertions(+), 109 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
index 8238e7a..d0debfa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
@@ -20,10 +20,9 @@ package org.apache.pinot.core.operator.transform.function;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.lang3.ArrayUtils;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
@@ -66,8 +65,13 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 public class LookupTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "lookUp";
 
-  // Lookup parameters
-  private String _dimTableName;
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  private static final int[] EMPTY_INTS = new int[0];
+  private static final long[] EMPTY_LONGS = new long[0];
+  private static final float[] EMPTY_FLOATS = new float[0];
+  private static final double[] EMPTY_DOUBLES = new double[0];
+  private static final String[] EMPTY_STRINGS = new String[0];
+
   private String _dimColumnName;
   private final List<String> _joinKeys = new ArrayList<>();
   private final List<FieldSpec> _joinValueFieldSpecs = new ArrayList<>();
@@ -76,6 +80,11 @@ public class LookupTransformFunction extends 
BaseTransformFunction {
   private DimensionTableDataManager _dataManager;
   private FieldSpec _lookupColumnFieldSpec;
 
+  private int _nullIntValue;
+  private long _nullLongValue;
+  private float _nullFloatValue;
+  private double _nullDoubleValue;
+
   @Override
   public String getName() {
     return FUNCTION_NAME;
@@ -93,7 +102,8 @@ public class LookupTransformFunction extends 
BaseTransformFunction {
     TransformFunction dimTableNameFunction = arguments.get(0);
     Preconditions.checkArgument(dimTableNameFunction instanceof 
LiteralTransformFunction,
         "First argument must be a literal(string) representing the dimension 
table name");
-    _dimTableName =
+    // Lookup parameters
+    String dimTableName =
         TableNameBuilder.OFFLINE.tableNameWithType(((LiteralTransformFunction) 
dimTableNameFunction).getLiteral());
 
     TransformFunction dimColumnFunction = arguments.get(1);
@@ -117,24 +127,32 @@ public class LookupTransformFunction extends 
BaseTransformFunction {
     }
 
     // Validate lookup table and relevant columns
-    _dataManager = 
DimensionTableDataManager.getInstanceByTableName(_dimTableName);
-    Preconditions.checkArgument(_dataManager != null, "Dimension table does 
not exist: %s", _dimTableName);
+    _dataManager = 
DimensionTableDataManager.getInstanceByTableName(dimTableName);
+    Preconditions.checkArgument(_dataManager != null, "Dimension table does 
not exist: %s", dimTableName);
 
     _lookupColumnFieldSpec = _dataManager.getColumnFieldSpec(_dimColumnName);
     Preconditions
-        .checkArgument(_lookupColumnFieldSpec != null, "Column does not exist 
in dimension table: %s:%s", _dimTableName,
+        .checkArgument(_lookupColumnFieldSpec != null, "Column does not exist 
in dimension table: %s:%s", dimTableName,
             _dimColumnName);
 
     for (String joinKey : _joinKeys) {
       FieldSpec pkColumnSpec = _dataManager.getColumnFieldSpec(joinKey);
       Preconditions.checkArgument(pkColumnSpec != null, "Primary key column 
doesn't exist in dimension table: %s:%s",
-          _dimTableName, joinKey);
+          dimTableName, joinKey);
       _joinValueFieldSpecs.add(pkColumnSpec);
     }
 
     List<String> tablePrimaryKeyColumns = _dataManager.getPrimaryKeyColumns();
     Preconditions.checkArgument(_joinKeys.equals(tablePrimaryKeyColumns),
         "Provided join keys (%s) must be the same as table primary keys: %s", 
_joinKeys, tablePrimaryKeyColumns);
+
+    Object defaultNullValue = _lookupColumnFieldSpec.getDefaultNullValue();
+    if (defaultNullValue instanceof Number) {
+      _nullIntValue = ((Number) defaultNullValue).intValue();
+      _nullLongValue = ((Number) defaultNullValue).longValue();
+      _nullFloatValue = ((Number) defaultNullValue).floatValue();
+      _nullDoubleValue = ((Number) defaultNullValue).intValue();
+    }
   }
 
   @Override
@@ -143,191 +161,263 @@ public class LookupTransformFunction extends 
BaseTransformFunction {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, @Nullable Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor 
valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = 
_joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = 
ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = 
tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary 
key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new 
PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) 
_lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) 
{
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) 
{
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = 
_lookupColumnFieldSpec.getDefaultNullValueString();
+    }
+  }
+
+  private void setBytesSV(int index, Object value) {
+    if (value instanceof byte[]) {
+      _byteValuesSV[index] = (byte[]) value;
+    } else {
+      _byteValuesSV[index] = EMPTY_BYTES;
+    }
+  }
+
+  private void setIntMV(int index, Object value) {
+    if (value instanceof int[]) {
+      _intValuesMV[index] = (int[]) value;
+    } else {
+      _intValuesMV[index] = EMPTY_INTS;
+    }
+  }
+
+  private void setLongMV(int index, Object value) {
+    if (value instanceof long[]) {
+      _longValuesMV[index] = (long[]) value;
+    } else {
+      _longValuesMV[index] = EMPTY_LONGS;
+    }
+  }
+
+  private void setFloatMV(int index, Object value) {
+    if (value instanceof float[]) {
+      _floatValuesMV[index] = (float[]) value;
+    } else {
+      _floatValuesMV[index] = EMPTY_FLOATS;
+    }
+  }
+
+  private void setDoubleMV(int index, Object value) {
+    if (value instanceof double[]) {
+      _doubleValuesMV[index] = (double[]) value;
+    } else {
+      _doubleValuesMV[index] = EMPTY_DOUBLES;
+    }
+  }
+
+  private void setStringMV(int index, Object value) {
+    if (value instanceof String[]) {
+      _stringValuesMV[index] = (String[]) value;
+    } else {
+      _stringValuesMV[index] = EMPTY_STRINGS;
     }
-    return resultSet;
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to