This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d04785c Introduce 'LOOKUP' Transform Function (#6383)
d04785c is described below
commit d04785c83f5740a5cec0a2c30d570949304cb8ad
Author: Caner Balci <[email protected]>
AuthorDate: Thu Jan 7 18:00:33 2021 -0800
Introduce 'LOOKUP' Transform Function (#6383)
LOOKUP is a regular transform function which uses the previously added
DimensionTableDataManager to execute a lookup from a Dimension table. Call
signature is as follows:
LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2, JoinValue2
...])
- TableName: name of the dimension table which will be used
- ColumnName: column name from the dimension table to look up
- JoinKey: primary key column name for the dimension table. Note: Only
primary key is supported for JoinKey
- JoinValue: primary key value
- *If the dimension table has more then one primary keys (composite PK),
you can add more keys and values for the rest of the args: JoinKey2, JoinValue2
... etc.
---
.../common/function/TransformFunctionType.java | 1 +
.../manager/offline/DimensionTableDataManager.java | 11 +
.../function/LookupTransformFunction.java | 332 ++++++++++++++++++
.../function/TransformFunctionFactory.java | 1 +
.../offline/DimensionTableDataManagerTest.java | 5 +
.../function/BaseTransformFunctionTest.java | 21 ++
.../function/LookupTransformFunctionTest.java | 386 +++++++++++++++++++++
.../org/apache/pinot/tools/JoinQuickStart.java | 7 +
8 files changed, 764 insertions(+)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index eabd94b..666d116 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -63,6 +63,7 @@ public enum TransformFunctionType {
VALUEIN("valueIn"),
MAPVALUE("mapValue"),
INIDSET("inIdSet"),
+ LOOKUP("lookUp"),
GROOVY("groovy"),
// Special type for annotation based scalar functions
SCALAR("scalar"),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index eea60f8..fafb23c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.core.data.manager.offline;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,6 +66,11 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
return _instances.computeIfAbsent(tableNameWithType, k -> new
DimensionTableDataManager());
}
+ @VisibleForTesting
+ public static DimensionTableDataManager registerDimensionTable(String
tableNameWithType, DimensionTableDataManager instance) {
+ return _instances.computeIfAbsent(tableNameWithType, k -> instance);
+ }
+
public static DimensionTableDataManager getInstanceByTableName(String
tableNameWithType) {
return _instances.get(tableNameWithType);
}
@@ -161,4 +168,8 @@ public class DimensionTableDataManager extends
OfflineTableDataManager {
public FieldSpec getColumnFieldSpec(String columnName) {
return _tableSchema.getFieldSpecFor(columnName);
}
+
+ public List<String> getPrimaryKeyColumns() {
+ return _primaryKeyColumns;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
new file mode 100644
index 0000000..eb1aeba
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * LOOKUP function takes 4 or more arguments:
+ * <ul>
+ * <li><b>TableName:</b> name of the dimension table which will be used</li>
+ * <li><b>ColumnName:</b> column name from the dimension table to look
up</li>
+ * <li><b>JoinKey:</b> primary key column name for the dimension table.
Note: Only primary key[s] are supported for JoinKey</li>
+ * <li><b>JoinValue:</b> primary key value</li>
+ * ...<br>
+ * *[If the dimension table has more then one primary keys (composite pk)]
+ * <li><b>JoinKey2</b></li>
+ * <li><b>JoinValue2</b></li>
+ * ...
+ * </ul>
+ * <br>
+ * Example:
+ * <pre>{@code SELECT
+ * baseballStats.playerName,
+ * baseballStats.teamID,
+ * LOOKUP('dimBaseballTeams', 'teamName', 'teamID', baseballStats.teamID)
+ * FROM
+ * baseballStats
+ * LIMIT 10}</pre>
+ * <br>
+ * Above example joins the dimension table 'baseballTeams' into regular table
'baseballStats' on 'teamID' key.
+ * Lookup function returns the value of the column 'teamName'.
+ */
+public class LookupTransformFunction extends BaseTransformFunction {
+ public static final String FUNCTION_NAME = "lookUp";
+
+ // Lookup parameters
+ private String _dimTableName;
+ private String _dimColumnName;
+ private final List<String> _joinKeys = new ArrayList<>();
+ private final List<FieldSpec> _joinValueFieldSpecs = new ArrayList<>();
+ private final List<TransformFunction> _joinValueFunctions = new
ArrayList<>();
+
+ private DimensionTableDataManager _dataManager;
+ private FieldSpec _lookupColumnFieldSpec;
+
+ @Override
+ public String getName() {
+ return FUNCTION_NAME;
+ }
+
+ @Override
+ public void init(List<TransformFunction> arguments, Map<String, DataSource>
dataSourceMap) {
+ // Check that there are correct number of arguments
+ Preconditions
+ .checkArgument(arguments.size() >= 4,
+ "At least 4 arguments are required for LOOKUP transform function:
" +
+ "LOOKUP(TableName, ColumnName, JoinKey, JoinValue [, JoinKey2,
JoinValue2 ...])");
+ Preconditions
+ .checkArgument(arguments.size() % 2 == 0, "Should have the same number
of JoinKey and JoinValue arguments");
+
+ TransformFunction dimTableNameFunction = arguments.get(0);
+ Preconditions.checkArgument(dimTableNameFunction instanceof
LiteralTransformFunction,
+ "First argument must be a literal(string) representing the dimension
table name");
+ _dimTableName = TableNameBuilder.OFFLINE.tableNameWithType(
+ ((LiteralTransformFunction) dimTableNameFunction).getLiteral());
+
+ TransformFunction dimColumnFunction = arguments.get(1);
+ Preconditions.checkArgument(dimColumnFunction instanceof
LiteralTransformFunction,
+ "Second argument must be a literal(string) representing the column
name from dimension table to lookup");
+ _dimColumnName = ((LiteralTransformFunction)
dimColumnFunction).getLiteral();
+
+ List<TransformFunction> joinArguments = arguments.subList(2,
arguments.size());
+ int numJoinArguments = joinArguments.size();
+ for (int i = 0; i < numJoinArguments / 2; i++) {
+ TransformFunction dimJoinKeyFunction = joinArguments.get((i * 2));
+ Preconditions.checkArgument(dimJoinKeyFunction instanceof
LiteralTransformFunction,
+ "JoinKey argument must be a literal(string) representing the primary
key for the dimension table");
+ _joinKeys.add(((LiteralTransformFunction)
dimJoinKeyFunction).getLiteral());
+
+ TransformFunction factJoinValueFunction = joinArguments.get((i * 2) + 1);
+ TransformResultMetadata factJoinValueFunctionResultMetadata =
factJoinValueFunction.getResultMetadata();
+
Preconditions.checkArgument(factJoinValueFunctionResultMetadata.isSingleValue(),
+ "JoinValue argument must be a single value expression");
+ _joinValueFunctions.add(factJoinValueFunction);
+ }
+
+ // Validate lookup table and relevant columns
+ _dataManager =
DimensionTableDataManager.getInstanceByTableName(_dimTableName);
+ Preconditions.checkArgument(_dataManager != null,
+ "Dimension table does not exist: %s", _dimTableName);
+
+ _lookupColumnFieldSpec = _dataManager.getColumnFieldSpec(_dimColumnName);
+ Preconditions.checkArgument(_lookupColumnFieldSpec != null,
+ "Column does not exist in dimension table: %s:%s", _dimTableName,
_dimColumnName);
+
+ for (String joinKey : _joinKeys) {
+ FieldSpec pkColumnSpec = _dataManager.getColumnFieldSpec(joinKey);
+ Preconditions.checkArgument(pkColumnSpec != null,
+ "Primary key column doesn't exist in dimension table: %s:%s",
_dimTableName, joinKey);
+ _joinValueFieldSpecs.add(pkColumnSpec);
+ }
+
+ List<String> tablePrimaryKeyColumns = _dataManager.getPrimaryKeyColumns();
+ Preconditions.checkArgument(_joinKeys.equals(tablePrimaryKeyColumns),
+ "Provided join keys (%s) must be the same as table primary keys: %s",
_joinKeys, tablePrimaryKeyColumns);
+ }
+
+ @Override
+ public TransformResultMetadata getResultMetadata() {
+ return new TransformResultMetadata(_lookupColumnFieldSpec.getDataType(),
+ _lookupColumnFieldSpec.isSingleValueField(), false);
+ }
+
+ private Object[] lookup(ProjectionBlock projectionBlock) {
+ int numPkColumns = _joinKeys.size();
+ int numDocuments = projectionBlock.getNumDocs();
+ Object[][] pkColumns = new Object[numPkColumns][];
+ for (int c = 0; c < numPkColumns; c++) {
+ FieldSpec.DataType colType = _joinValueFieldSpecs.get(c).getDataType();
+ TransformFunction tf = _joinValueFunctions.get(c);
+ switch (colType) {
+ case INT:
+ pkColumns[c] =
ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+ break;
+ case LONG:
+ pkColumns[c] =
ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+ break;
+ case FLOAT:
+ pkColumns[c] =
ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+ break;
+ case DOUBLE:
+ pkColumns[c] =
ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+ break;
+ case STRING:
+ pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
+ break;
+ case BYTES:
+ byte[][] primitiveValues =
tf.transformToBytesValuesSV(projectionBlock);
+ pkColumns[c] = new ByteArray[numDocuments];
+ for (int i = 0; i < numDocuments; i++) {
+ pkColumns[c][i] = new ByteArray(primitiveValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown column type for primary
key");
+ }
+ }
+
+ Object[] resultSet = new Object[numDocuments];
+ Object[] pkValues = new Object[numPkColumns];
+ for (int i = 0; i < numDocuments; i++) {
+ // prepare pk
+ for (int c = 0; c < numPkColumns; c++) {
+ pkValues[c] = pkColumns[c][i];
+ }
+ // lookup
+ GenericRow row = _dataManager.lookupRowByPrimaryKey(new
PrimaryKey(pkValues));
+ if (row != null) {
+ resultSet[i] = row.getValue(_dimColumnName);
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ int[] resultSet = new int[lookupObjects.length];
+ Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).intValue());
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = ((Number) lookupObjects[i]).intValue();
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ long[] resultSet = new long[lookupObjects.length];
+ Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).longValue());
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = ((Number) lookupObjects[i]).longValue();
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ float[] resultSet = new float[lookupObjects.length];
+ Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = ((Number) lookupObjects[i]).floatValue();
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ double[] resultSet = new double[lookupObjects.length];
+ Arrays.fill(resultSet, ((Number)
_lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ String[] resultSet = new String[lookupObjects.length];
+ Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = lookupObjects[i].toString();
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ byte[][] resultSet = new byte[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (byte[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ int[][] resultSet = new int[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (int[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ long[][] resultSet = new long[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (long[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
+ Object[] lookupObjects = lookup(projectionBlock);
+ float[][] resultSet = new float[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (float[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock)
{
+ Object[] lookupObjects = lookup(projectionBlock);
+ double[][] resultSet = new double[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (double[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+
+ @Override
+ public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock)
{
+ Object[] lookupObjects = lookup(projectionBlock);
+ String[][] resultSet = new String[lookupObjects.length][0];
+ for (int i = 0; i < lookupObjects.length; i++) {
+ if (lookupObjects[i] != null) {
+ resultSet[i] = (String[]) lookupObjects[i];
+ }
+ }
+ return resultSet;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index 3d6ff94..fb0c826 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -92,6 +92,7 @@ public class TransformFunctionFactory {
put(canonicalize(TransformFunctionType.VALUEIN.getName().toLowerCase()),
ValueInTransformFunction.class);
put(canonicalize(TransformFunctionType.MAPVALUE.getName().toLowerCase()),
MapValueTransformFunction.class);
put(canonicalize(TransformFunctionType.INIDSET.getName().toLowerCase()),
InIdSetTransformFunction.class);
+
put(canonicalize(TransformFunctionType.LOOKUP.getName().toLowerCase()),
LookupTransformFunction.class);
// Array functions
put(canonicalize(TransformFunctionType.ARRAYAVERAGE.getName().toLowerCase()),
ArrayAverageTransformFunction.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index d489144..23dd01f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager.offline;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.File;
import java.net.URL;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.helix.AccessOption;
@@ -160,6 +161,10 @@ public class DimensionTableDataManagerTest {
Assert.assertEquals(spec.getDataType(), FieldSpec.DataType.STRING,
"Should return correct data type for teamName column");
+ // Confirm we can read primary column list
+ List<String> pkColumns = mgr.getPrimaryKeyColumns();
+ Assert.assertEquals(pkColumns, Arrays.asList("teamID"), "Should return PK
column list");
+
// Remove the segment
List<SegmentDataManager> segmentManagers = mgr.acquireAllSegments();
Assert.assertEquals(segmentManagers.size(), 1, "Should have exactly one
segment manager");
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 91eb467..6beca8a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -247,6 +247,27 @@ public abstract class BaseTransformFunctionTest {
}
}
+ protected void testTransformFunctionMV(TransformFunction transformFunction,
long[][] expectedValues) {
+ long[][] longMVValues =
transformFunction.transformToLongValuesMV(_projectionBlock);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ Assert.assertEquals(longMVValues[i], expectedValues[i]);
+ }
+ }
+
+ protected void testTransformFunctionMV(TransformFunction transformFunction,
float[][] expectedValues) {
+ float[][] floatMVValues =
transformFunction.transformToFloatValuesMV(_projectionBlock);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ Assert.assertEquals(floatMVValues[i], expectedValues[i]);
+ }
+ }
+
+ protected void testTransformFunctionMV(TransformFunction transformFunction,
double[][] expectedValues) {
+ double[][] doubleMVValues =
transformFunction.transformToDoubleValuesMV(_projectionBlock);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ Assert.assertEquals(doubleMVValues[i], expectedValues[i]);
+ }
+ }
+
protected void testTransformFunctionMV(TransformFunction transformFunction,
String[][] expectedValues) {
String[][] stringMVValues =
transformFunction.transformToStringValuesMV(_projectionBlock);
for (int i = 0; i < NUM_ROWS; i++) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
new file mode 100644
index 0000000..93e3633
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunctionTest.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.Arrays;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.common.utils.PrimitiveArrayUtils;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.*;
+
+
+public class LookupTransformFunctionTest extends BaseTransformFunctionTest {
+ private static final String TABLE_NAME = "baseballTeams_OFFLINE";
+ private DimensionTableDataManager tableManager;
+
+ @BeforeSuite
+ public void setUp()
+ throws Exception {
+ super.setUp();
+
+ createTestableTableManager();
+ }
+
+ private void createTestableTableManager() {
+ tableManager = mock(DimensionTableDataManager.class);
+ DimensionTableDataManager.registerDimensionTable(TABLE_NAME, tableManager);
+
+ // Creating a mock table which looks like:
+ // TeamID (PK, str) | TeamName(str) | TeamName_MV(str[]) |
TeamInteger(int) | TeamInteger_MV(int[]) | TeamFloat(float) | ...
+ //
+ // All values are dynamically created to be variations of the primary key.
+ // e.g
+ // lookupRowByPrimaryKey(['FOO']) -> (TeamID: 'foo', TeamName:
'teamName_for_foo', TeamInteger: hashCode(['foo']), ...
+ //
+
when(tableManager.getPrimaryKeyColumns()).thenReturn(Arrays.asList("teamID"));
+ when(tableManager.getColumnFieldSpec("teamID"))
+ .thenReturn(new DimensionFieldSpec("teamID",
FieldSpec.DataType.STRING, true));
+ when(tableManager.getColumnFieldSpec("teamName"))
+ .thenReturn(new DimensionFieldSpec("teamName",
FieldSpec.DataType.STRING, true));
+ when(tableManager.getColumnFieldSpec("teamName_MV"))
+ .thenReturn(new DimensionFieldSpec("teamName_MV",
FieldSpec.DataType.STRING, false));
+ when(tableManager.getColumnFieldSpec("teamInteger"))
+ .thenReturn(new DimensionFieldSpec("teamInteger",
FieldSpec.DataType.INT, true));
+ when(tableManager.getColumnFieldSpec("teamInteger_MV"))
+ .thenReturn(new DimensionFieldSpec("teamInteger_MV",
FieldSpec.DataType.INT, false));
+ when(tableManager.getColumnFieldSpec("teamFloat"))
+ .thenReturn(new DimensionFieldSpec("teamFloat",
FieldSpec.DataType.FLOAT, true));
+ when(tableManager.getColumnFieldSpec("teamFloat_MV"))
+ .thenReturn(new DimensionFieldSpec("teamFloat_MV",
FieldSpec.DataType.FLOAT, false));
+ when(tableManager.getColumnFieldSpec("teamDouble"))
+ .thenReturn(new DimensionFieldSpec("teamDouble",
FieldSpec.DataType.DOUBLE, true));
+ when(tableManager.getColumnFieldSpec("teamDouble_MV"))
+ .thenReturn(new DimensionFieldSpec("teamDouble_MV",
FieldSpec.DataType.DOUBLE, false));
+ when(tableManager.getColumnFieldSpec("teamLong"))
+ .thenReturn(new DimensionFieldSpec("teamLong",
FieldSpec.DataType.LONG, true));
+ when(tableManager.getColumnFieldSpec("teamLong_MV"))
+ .thenReturn(new DimensionFieldSpec("teamLong_MV",
FieldSpec.DataType.LONG, false));
+ when(tableManager.getColumnFieldSpec("teamBytes"))
+ .thenReturn(new DimensionFieldSpec("teamNameBytes",
FieldSpec.DataType.BYTES, true));
+
when(tableManager.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation
-> {
+ PrimaryKey key = invocation.getArgument(0);
+ GenericRow row = new GenericRow();
+ row.putValue("teamName", "teamName_for_" + key.toString());
+ row.putValue("teamName_MV",
+ new String[]{"teamName_for_" + key.toString() + "_1",
"teamName_for_" + key.toString() + "_2",});
+ row.putValue("teamInteger", key.hashCode());
+ row.putValue("teamInteger_MV", new int[]{key.hashCode(),
key.hashCode()});
+ row.putValue("teamFloat", (float) key.hashCode());
+ row.putValue("teamFloat_MV", new float[]{(float) key.hashCode(), (float)
key.hashCode()});
+ row.putValue("teamDouble", (double) key.hashCode());
+ row.putValue("teamDouble_MV", new double[]{(double) key.hashCode(),
(double) key.hashCode()});
+ row.putValue("teamLong", (long) key.hashCode());
+ row.putValue("teamLong_MV", new long[]{(long) key.hashCode(), (long)
key.hashCode()});
+ row.putValue("teamBytes", ("teamBytes_for_" +
key.toString()).getBytes());
+ return row;
+ });
+ }
+
+ @Test
+ public void instantiationTests()
+ throws Exception {
+ // Success case
+ ExpressionContext expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)",
STRING_SV_COLUMN));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ Assert.assertTrue(transformFunction instanceof LookupTransformFunction);
+ Assert.assertEquals(transformFunction.getName(),
LookupTransformFunction.FUNCTION_NAME);
+
+ // Wrong number of arguments
+ Assert.assertThrows(BadQueryRequestException.class, () -> {
+ TransformFunctionFactory
+
.get(QueryContextConverterUtils.getExpression(String.format("lookup('baseballTeams','teamName','teamID')")),
+ _dataSourceMap);
+ });
+
+ // Wrong number of join keys
+ Assert.assertThrows(BadQueryRequestException.class, () -> {
+ TransformFunctionFactory.get(QueryContextConverterUtils.getExpression(
+ String.format("lookup('baseballTeams','teamName','teamID', %s,
'danglingKey')", STRING_SV_COLUMN)),
+ _dataSourceMap);
+ });
+
+ // Non literal tableName argument
+ Assert.assertThrows(BadQueryRequestException.class, () -> {
+ TransformFunctionFactory.get(QueryContextConverterUtils
+ .getExpression(String.format("lookup(%s,'teamName','teamID',
%s)", STRING_SV_COLUMN, INT_SV_COLUMN)),
+ _dataSourceMap);
+ });
+
+ // Non literal lookup columnName argument
+ Assert.assertThrows(BadQueryRequestException.class, () -> {
+ TransformFunctionFactory.get(QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams',%s,'teamID',%s)",
STRING_SV_COLUMN, INT_SV_COLUMN)),
+ _dataSourceMap);
+ });
+
+ // Non literal lookup columnName argument
+ Assert.assertThrows(BadQueryRequestException.class, () -> {
+ TransformFunctionFactory.get(QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamName',%s,%s)",
STRING_SV_COLUMN, INT_SV_COLUMN)),
+ _dataSourceMap);
+ });
+ }
+
+ @Test
+ public void resultDataTypeTest()
+ throws Exception {
+ HashMap<String, FieldSpec.DataType> testCases = new HashMap<String,
FieldSpec.DataType>() {{
+ put("teamName", FieldSpec.DataType.STRING);
+ put("teamInteger", FieldSpec.DataType.INT);
+ put("teamFloat", FieldSpec.DataType.FLOAT);
+ put("teamLong", FieldSpec.DataType.LONG);
+ put("teamDouble", FieldSpec.DataType.DOUBLE);
+ }};
+
+ for (Map.Entry<String, FieldSpec.DataType> testCase :
testCases.entrySet()) {
+ ExpressionContext expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('baseballTeams','%s','teamID',%s)",
testCase.getKey(), STRING_SV_COLUMN));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ Assert.assertEquals(transformFunction.getResultMetadata().getDataType(),
testCase.getValue(),
+ String.format("Expecting %s data type for lookup column: '%s'",
testCase.getKey(), testCase.getValue()));
+ }
+ }
+
+ @Test
+ public void basicLookupTests()
+ throws Exception {
+ // Lookup col: StringSV
+ // PK: [String]
+ ExpressionContext expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamName','teamID',%s)",
STRING_SV_COLUMN));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ String[] expectedStringValues = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedStringValues[i] = String.format("teamName_for_[%s]",
_stringSVValues[i]);
+ }
+ testTransformFunction(transformFunction, expectedStringValues);
+
+ // Lookup col: IntSV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamInteger','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ int[] expectedIntValues = new int[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedIntValues[i] = (new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode();
+ }
+ testTransformFunction(transformFunction, expectedIntValues);
+
+ // Lookup col: DoubleSV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamDouble','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ double[] expectedDoubleValues = new double[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedDoubleValues[i] = (double) (new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode();
+ }
+ testTransformFunction(transformFunction, expectedDoubleValues);
+
+ // Lookup col: BytesSV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamBytes','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ byte[][] expectedBytesValues = new byte[NUM_ROWS][];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedBytesValues[i] = String.format("teamBytes_for_[%s]",
_stringSVValues[i]).getBytes();
+ }
+ testTransformFunction(transformFunction, expectedBytesValues);
+ }
+
+ @Test
+ public void multiValueLookupTests()
+ throws Exception {
+ // Lookup col: StringMV
+ // PK: [String]
+ ExpressionContext expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamName_MV','teamID',%s)",
STRING_SV_COLUMN));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ String[][] expectedStringMVValues = new String[NUM_ROWS][];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedStringMVValues[i] =
+ new String[]{String.format("teamName_for_[%s]_1",
_stringSVValues[i]), String.format("teamName_for_[%s]_2",
+ _stringSVValues[i]),};
+ }
+ testTransformFunctionMV(transformFunction, expectedStringMVValues);
+
+ // Lookup col: IntegerMV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamInteger_MV','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ int[][] expectedIntegerMVValues = new int[NUM_ROWS][0];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedIntegerMVValues[i] =
+ new int[]{(new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode(), (new PrimaryKey(
+ new Object[]{_stringSVValues[i]})).hashCode(),};
+ }
+ testTransformFunctionMV(transformFunction, expectedIntegerMVValues);
+
+ // Lookup col: FloatMV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamFloat_MV','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ float[][] expectedFloatMVValues = new float[NUM_ROWS][0];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedFloatMVValues[i] =
+ new float[]{(float) (new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode(), (float) (new PrimaryKey(
+ new Object[]{_stringSVValues[i]})).hashCode(),};
+ }
+ testTransformFunctionMV(transformFunction, expectedFloatMVValues);
+
+ // Lookup col: LongMV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamLong_MV','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ long[][] expectedLongMVValues = new long[NUM_ROWS][0];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedLongMVValues[i] =
+ new long[]{(long) (new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode(), (long) (new PrimaryKey(
+ new Object[]{_stringSVValues[i]})).hashCode(),};
+ }
+ testTransformFunctionMV(transformFunction, expectedLongMVValues);
+
+ // Lookup col: DoubleMV
+ // PK: [String]
+ expression = QueryContextConverterUtils
+
.getExpression(String.format("lookup('baseballTeams','teamDouble_MV','teamID',%s)",
STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ double[][] expectedDoubleMVValues = new double[NUM_ROWS][0];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ expectedDoubleMVValues[i] =
+ new double[]{(double) (new PrimaryKey(new
Object[]{_stringSVValues[i]})).hashCode(), (double) (new PrimaryKey(
+ new Object[]{_stringSVValues[i]})).hashCode(),};
+ }
+ testTransformFunctionMV(transformFunction, expectedDoubleMVValues);
+ }
+
+ @Test
+ public void primaryKeyTypeTest()
+ throws Exception {
+ // preparing simple tables for testing different primary key types (INT,
STRING, LONG)
+ Map<String, FieldSpec.DataType> testTables = new HashMap<String,
FieldSpec.DataType>() {{
+ put("dimTableWithIntPK_OFFLINE", FieldSpec.DataType.INT);
+ put("dimTableWithStringPK_OFFLINE", FieldSpec.DataType.STRING);
+ put("dimTableWithLongPK_OFFLINE", FieldSpec.DataType.LONG);
+ put("dimTableWithFloatPK_OFFLINE", FieldSpec.DataType.FLOAT);
+ put("dimTableWithDoublePK_OFFLINE", FieldSpec.DataType.DOUBLE);
+ put("dimTableWithBytesPK_OFFLINE", FieldSpec.DataType.BYTES);
+ }};
+ for (Map.Entry<String, FieldSpec.DataType> table : testTables.entrySet()) {
+ DimensionTableDataManager mgr = mock(DimensionTableDataManager.class);
+ DimensionTableDataManager.registerDimensionTable(table.getKey(), mgr);
+
when(mgr.getPrimaryKeyColumns()).thenReturn(Arrays.asList("primaryColumn"));
+ when(mgr.getColumnFieldSpec("primaryColumn"))
+ .thenReturn(new DimensionFieldSpec("primaryColumn",
table.getValue(), true));
+ when(mgr.getColumnFieldSpec("lookupColumn"))
+ .thenReturn(new DimensionFieldSpec("lookupColumn",
FieldSpec.DataType.STRING, true));
+
when(mgr.lookupRowByPrimaryKey(any(PrimaryKey.class))).thenAnswer(invocation ->
{
+ PrimaryKey key = invocation.getArgument(0);
+ GenericRow row = new GenericRow();
+ row.putValue("lookupColumn", String.format("lookup_value_for_[%s]",
key.hashCode()));
+ return row;
+ });
+ }
+
+ // PK: [Int]
+ ExpressionContext expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithIntPK', 'lookupColumn',
'primaryColumn', %s)", INT_SV_COLUMN));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ String[] expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new Object[]{(Integer)_intSVValues[i]});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+
+ // PK: [String]
+ expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithStringPK', 'lookupColumn',
'primaryColumn', %s)", STRING_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new Object[]{_stringSVValues[i]});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+
+ // PK: [Long]
+ expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithLongPK', 'lookupColumn',
'primaryColumn', %s)", LONG_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new Object[]{(Long)_longSVValues[i]});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+
+ // PK: [Float]
+ expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithFloatPK', 'lookupColumn',
'primaryColumn', %s)", FLOAT_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new Object[]{(Float)_floatSVValues[i]});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+
+ // PK: [Double]
+ expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithDoublePK', 'lookupColumn',
'primaryColumn', %s)", DOUBLE_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new
Object[]{(Double)_doubleSVValues[i]});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+
+ // PK: [Byte[]]
+ expression = QueryContextConverterUtils.getExpression(
+ String.format("lookup('dimTableWithBytesPK', 'lookupColumn',
'primaryColumn', %s)", BYTES_SV_COLUMN));
+ transformFunction = TransformFunctionFactory.get(expression,
_dataSourceMap);
+ expectedResults = new String[NUM_ROWS];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ PrimaryKey key = new PrimaryKey(new Object[]{new
ByteArray(_bytesSVValues[i])});
+ expectedResults[i] = String.format("lookup_value_for_[%s]",
key.hashCode());
+ }
+ testTransformFunction(transformFunction, expectedResults);
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
index a4e6340..01fc669 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/JoinQuickStart.java
@@ -118,6 +118,13 @@ public class JoinQuickStart
printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q2)));
printStatus(Quickstart.Color.GREEN,
"***************************************************");
+ String q3 = "select playerName, teamID, lookup('dimBaseballTeams',
'teamName', 'teamID', teamID) from baseballStats limit 10";
+ printStatus(Quickstart.Color.YELLOW, "Baseball Stats with joined team
names");
+ printStatus(Quickstart.Color.CYAN, "Query : " + q3);
+ printStatus(Quickstart.Color.YELLOW,
prettyPrintResponse(runner.runQuery(q3)));
+ printStatus(Quickstart.Color.GREEN,
"***************************************************");
+
+
printStatus(Quickstart.Color.GREEN, "You can always go to
http://localhost:9000 to play around in the query console");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]