This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d1227e466d [feature] [null support # 1] selection only literal in
broker null support (#10376)
d1227e466d is described below
commit d1227e466dc6e350214485334cdf4eedc7502d67
Author: Yao Liu <[email protected]>
AuthorDate: Fri Mar 24 20:48:34 2023 -0700
[feature] [null support # 1] selection only literal in broker null support
(#10376)
* selection only literal in broker null support
* fix test
* address comments
* address comment
* move null to last
* address comments
---
.../requesthandler/BaseBrokerRequestHandler.java | 4 +
.../pinot/common/function/FunctionInvoker.java | 12 +++
.../request/context/RequestContextUtils.java | 4 +
.../org/apache/pinot/common/utils/DataSchema.java | 8 +-
.../pinot/common/utils/request/RequestUtils.java | 13 ++-
.../rewriter/CompileTimeFunctionsInvoker.java | 8 +-
.../core/common/datablock/DataBlockBuilder.java | 8 ++
.../core/common/datablock/DataBlockTestUtils.java | 5 ++
.../core/common/datatable/DataTableSerDeTest.java | 7 ++
.../tests/NullHandlingIntegrationTest.java | 100 +++++++++++++++++++++
.../java/org/apache/pinot/spi/data/FieldSpec.java | 3 +-
11 files changed, 168 insertions(+), 4 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5eaa87a2f9..7112e0ee58 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -1347,6 +1347,10 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
columnTypes.add(DataSchema.ColumnDataType.BYTES);
row.add(BytesUtils.toHexString(literal.getBinaryValue()));
break;
+ case NULL_VALUE:
+ columnTypes.add(DataSchema.ColumnDataType.UNKNOWN);
+ row.add(null);
+ break;
default:
break;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
index 98fbbf0635..59a91ca76e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionInvoker.java
@@ -32,12 +32,17 @@ import org.apache.pinot.common.utils.PinotDataType;
*/
public class FunctionInvoker {
private final Method _method;
+ // If true, the function should return null if any of its argument is null
+ // Otherwise, the function should deal with null in its own implementation.
+ private final boolean _isNullIntolerant;
+
private final Class<?>[] _parameterClasses;
private final PinotDataType[] _parameterTypes;
private final Object _instance;
public FunctionInvoker(FunctionInfo functionInfo) {
_method = functionInfo.getMethod();
+ _isNullIntolerant = !functionInfo.hasNullableParameters();
Class<?>[] parameterClasses = _method.getParameterTypes();
int numParameters = parameterClasses.length;
_parameterClasses = new Class<?>[numParameters];
@@ -123,6 +128,13 @@ public class FunctionInvoker {
* {@link #convertTypes(Object[])} to convert the argument types if needed
before calling this method.
*/
public Object invoke(Object[] arguments) {
+ if (_isNullIntolerant) {
+ for (Object arg : arguments) {
+ if (arg == null) {
+ return null;
+ }
+ }
+ }
try {
return _method.invoke(_instance, arguments);
} catch (Exception e) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
index ad2340942f..fc228a2b2b 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.EnumUtils;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.ExpressionType;
import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.IsNotNullPredicate;
@@ -230,6 +231,9 @@ public class RequestContextUtils {
throw new BadQueryRequestException(
"Pinot does not support column or function on the right-hand side of
the predicate");
}
+ if (thriftExpression.getLiteral().getSetField() ==
Literal._Fields.NULL_VALUE) {
+ return "null";
+ }
return thriftExpression.getLiteral().getFieldValue().toString();
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index a044f5dd60..d8fedfc3d7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -209,7 +209,8 @@ public class DataSchema {
BOOLEAN_ARRAY(INT_ARRAY, new int[0]),
TIMESTAMP_ARRAY(LONG_ARRAY, new long[0]),
STRING_ARRAY(new String[0]),
- BYTES_ARRAY(new byte[0][]);
+ BYTES_ARRAY(new byte[0][]),
+ UNKNOWN(null);
private static final EnumSet<ColumnDataType> NUMERIC_TYPES =
EnumSet.of(INT, LONG, FLOAT, DOUBLE, BIG_DECIMAL);
private static final Ordering<ColumnDataType> NUMERIC_TYPE_ORDERING =
Ordering.explicit(INT, LONG, FLOAT, DOUBLE);
@@ -315,6 +316,8 @@ public class DataSchema {
return DataType.JSON;
case BYTES:
return DataType.BYTES;
+ case UNKNOWN:
+ return DataType.UNKNOWN;
default:
throw new IllegalStateException(String.format("Cannot convert
ColumnDataType: %s to DataType", this));
}
@@ -361,6 +364,7 @@ public class DataSchema {
return toTimestampArray(value);
case BYTES_ARRAY:
return (byte[][]) value;
+ case UNKNOWN: // fall through
case OBJECT:
return (Serializable) value;
default:
@@ -525,6 +529,8 @@ public class DataSchema {
return JSON;
case BYTES:
return BYTES;
+ case UNKNOWN:
+ return UNKNOWN;
default:
throw new IllegalStateException("Unsupported data type: " +
dataType);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index a60fd4d8db..26f8cbc32a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -122,11 +122,13 @@ public class RequestUtils {
literal.setDoubleValue(node.bigDecimalValue().doubleValue());
}
} else {
- // TODO: Support null literal and other types.
switch (node.getTypeName()) {
case BOOLEAN:
literal.setBoolValue(node.booleanValue());
break;
+ case NULL:
+ literal.setNullValue(true);
+ break;
default:
literal.setStringValue(StringUtils.replace(node.toValue(), "''",
"'"));
break;
@@ -173,7 +175,16 @@ public class RequestUtils {
return expression;
}
+ public static Expression getNullLiteralExpression() {
+ Expression expression = createNewLiteralExpression();
+ expression.getLiteral().setNullValue(true);
+ return expression;
+ }
+
public static Expression getLiteralExpression(Object object) {
+ if (object == null) {
+ return getNullLiteralExpression();
+ }
if (object instanceof Integer || object instanceof Long) {
return RequestUtils.getLiteralExpression(((Number) object).longValue());
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CompileTimeFunctionsInvoker.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CompileTimeFunctionsInvoker.java
index 1c93d64e85..2cb89d5fe8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CompileTimeFunctionsInvoker.java
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CompileTimeFunctionsInvoker.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.function.FunctionInvoker;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.sql.parsers.SqlCompilationException;
@@ -74,7 +75,12 @@ public class CompileTimeFunctionsInvoker implements
QueryRewriter {
if (functionInfo != null) {
Object[] arguments = new Object[numOperands];
for (int i = 0; i < numOperands; i++) {
- arguments[i] =
function.getOperands().get(i).getLiteral().getFieldValue();
+ Literal literal = function.getOperands().get(i).getLiteral();
+ if (literal.isSetNullValue()) {
+ arguments[i] = null;
+ } else {
+ arguments[i] =
function.getOperands().get(i).getLiteral().getFieldValue();
+ }
}
try {
FunctionInvoker invoker = new FunctionInvoker(functionInfo);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 573b3beadf..0173a2a914 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -217,6 +217,9 @@ public class DataBlockBuilder {
ArrayCopyUtils.copy(timestamps, longs, length);
setColumn(rowBuilder, byteBuffer, longs);
break;
+ case UNKNOWN:
+ setColumn(rowBuilder, byteBuffer, (Object) null);
+ break;
default:
throw new IllegalStateException(
String.format("Unsupported data type: %s for column: %s",
rowBuilder._columnDataTypes[colId],
@@ -462,6 +465,11 @@ public class DataBlockBuilder {
setColumn(columnarBuilder, byteBuffer, (String[]) value);
}
break;
+ case UNKNOWN:
+ for (int rowId = 0; rowId < columnarBuilder._numRows; rowId++) {
+ setColumn(columnarBuilder, byteBuffer, (Object) null);
+ }
+ break;
default:
throw new IllegalStateException(
String.format("Unsupported data type: %s for column: %s",
columnarBuilder._columnDataTypes[colId],
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
index 67cc82cba6..8d0f10058b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTestUtils.java
@@ -127,6 +127,9 @@ public class DataBlockTestUtils {
}
row[colId] = timestampArray;
break;
+ case UNKNOWN:
+ row[colId] = null;
+ break;
default:
throw new UnsupportedOperationException("Can't fill random data for
column type: " + columnDataTypes[colId]);
}
@@ -173,6 +176,8 @@ public class DataBlockTestUtils {
return dataBlock.getDoubleArray(rowId, colId);
case STRING_ARRAY:
return dataBlock.getStringArray(rowId, colId);
+ case UNKNOWN:
+ return null;
default:
throw new UnsupportedOperationException("Can't retrieve data for
column type: " + columnDataType);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 9b53e87354..6fece827ec 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -680,6 +680,9 @@ public class DataTableSerDeTest {
STRING_ARRAYS[rowId] = stringArray;
dataTableBuilder.setColumn(colId, stringArray);
break;
+ case UNKNOWN:
+ dataTableBuilder.setColumn(colId, (Object) null);
+ break;
default:
throw new UnsupportedOperationException("Unable to generate random
data for: " + columnDataTypes[colId]);
}
@@ -773,6 +776,10 @@ public class DataTableSerDeTest {
Assert.assertTrue(Arrays.equals(newDataTable.getStringArray(rowId,
colId), STRING_ARRAYS[rowId]),
ERROR_MESSAGE);
break;
+ case UNKNOWN:
+ Object nulValue = newDataTable.getCustomObject(rowId, colId);
+ Assert.assertNull(nulValue, ERROR_MESSAGE);
+ break;
default:
throw new UnsupportedOperationException("Unable to generate random
data for: " + columnDataTypes[colId]);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index 363e80d8a2..c93b37835f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.integration.tests;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.List;
import javax.annotation.Nullable;
@@ -29,6 +30,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
/**
* Integration test that creates a Kafka broker, creates a Pinot cluster that
consumes from Kafka and queries Pinot.
@@ -202,4 +206,100 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet {
testQuery(pinotQuery, h2Query);
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
}
+
+ @Test
+ public void testNullLiteralSelectionOnlyBroker()
+ throws Exception {
+ DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+ // Null literal only
+ String sqlQuery = "SELECT null FROM mytable
OPTION(enableNullHandling=true)";
+ JsonNode response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ JsonNode rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asText(), "null");
+
+ // Null related functions
+ sqlQuery = "SELECT isNull(null) FROM " + getTableName() + " OPTION
(enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), true);
+
+ sqlQuery = "SELECT isNotNull(null) FROM " + getTableName() + " OPTION
(enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), false);
+
+
+ sqlQuery = "SELECT coalesce(null, 1) FROM " + getTableName() + " OPTION
(enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asInt(), 1);
+
+ sqlQuery = "SELECT coalesce(null, null) FROM " + getTableName() + "
OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asText(), "null");
+
+ sqlQuery = "SELECT isDistinctFrom(null, null) FROM " + getTableName() + "
OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), false);
+
+ sqlQuery = "SELECT isNotDistinctFrom(null, null) FROM " + getTableName() +
" OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), true);
+
+
+ sqlQuery = "SELECT isDistinctFrom(null, 1) FROM " + getTableName() + "
OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), true);
+
+ sqlQuery = "SELECT isNotDistinctFrom(null, 1) FROM " + getTableName() + "
OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asBoolean(), false);
+
+ sqlQuery = "SELECT case when true then null end FROM " + getTableName() +
" OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asText(), "null");
+
+
+ sqlQuery = "SELECT case when false then 1 end FROM " + getTableName() + "
OPTION (enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asText(), "null");
+
+
+ // Null intolerant functions
+ sqlQuery = "SELECT add(null, 1) FROM " + getTableName() + " OPTION
(enableNullHandling=true);";
+ response = postQuery(sqlQuery, _brokerBaseApiUrl);
+ rows = response.get("resultTable").get("rows");
+ assertTrue(response.get("exceptions").isEmpty());
+ assertEquals(rows.size(), 1);
+ assertEquals(rows.get(0).get(0).asText(), "null");
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index 9b3ea0862b..1c98aed964 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -397,7 +397,8 @@ public abstract class FieldSpec implements
Comparable<FieldSpec>, Serializable {
BYTES(false, false),
STRUCT(false, false),
MAP(false, false),
- LIST(false, false);
+ LIST(false, false),
+ UNKNOWN(false, true);
private final DataType _storedType;
private final int _size;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]