This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09db4d9  Adding basic null predicate support (#4943)
09db4d9 is described below

commit 09db4d91ed11c871a53eeebd29a38aace767b9eb
Author: icefury71 <[email protected]>
AuthorDate: Fri Dec 20 16:47:36 2019 -0800

    Adding basic null predicate support (#4943)
    
    Adding null predicate support in reference to Issue #4230
    
    This PR adds limited support for "IS NULL" and "IS NOT NULL" filter 
predicates. Currently this only works for leaf filter predicates.
---
 .../pinot/common/request/FilterOperator.java       |  12 ++-
 .../apache/pinot/pql/parsers/Pql2AstListener.java  |   4 +-
 .../pinot/pql/parsers/pql2/ast/FilterKind.java     |   4 +-
 .../parsers/pql2/ast/IsNullPredicateAstNode.java   |  84 +++++++++++++++++
 pinot-common/src/thrift/request.thrift             |   4 +-
 .../org/apache/pinot/core/common/Predicate.java    |  10 +-
 .../core/common/predicate/IsNotNullPredicate.java  |  18 ++--
 .../core/common/predicate/IsNullPredicate.java     |  18 ++--
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  11 +++
 ...er.java => ThreadSafeMutableRoaringBitmap.java} |  34 ++++---
 .../invertedindex/RealtimeInvertedIndexReader.java |  25 +----
 .../RealtimeNullValueVectorReaderWriter.java       |  13 ++-
 .../index/readers/NullValueVectorReader.java       |   8 ++
 .../index/readers/NullValueVectorReaderImpl.java   |   4 +
 .../SegmentGenerationWithNullValueVectorTest.java  | 104 +++++++++++++++++++++
 15 files changed, 291 insertions(+), 62 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/request/FilterOperator.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/request/FilterOperator.java
index 72d57ba..661d16a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/request/FilterOperator.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/request/FilterOperator.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 /**
- * Autogenerated by Thrift Compiler (0.12.0)
+ * Autogenerated by Thrift Compiler (0.13.0)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  *  @generated
@@ -30,7 +30,7 @@ package org.apache.pinot.common.request;
  * Filter Operator
  * 
  */
[email protected](value = "Autogenerated by Thrift Compiler 
(0.12.0)", date = "2019-07-19")
[email protected](value = "Autogenerated by Thrift Compiler 
(0.13.0)", date = "2019-12-19")
 public enum FilterOperator implements org.apache.thrift.TEnum {
   AND(0),
   OR(1),
@@ -39,7 +39,9 @@ public enum FilterOperator implements org.apache.thrift.TEnum 
{
   RANGE(4),
   REGEXP_LIKE(5),
   NOT_IN(6),
-  IN(7);
+  IN(7),
+  IS_NULL(8),
+  IS_NOT_NULL(9);
 
   private final int value;
 
@@ -77,6 +79,10 @@ public enum FilterOperator implements 
org.apache.thrift.TEnum {
         return NOT_IN;
       case 7:
         return IN;
+      case 8:
+        return IS_NULL;
+      case 9:
+        return IS_NOT_NULL;
       default:
         return null;
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/Pql2AstListener.java 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/Pql2AstListener.java
index 2a617e5..9728d80 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/Pql2AstListener.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/Pql2AstListener.java
@@ -33,7 +33,7 @@ import org.apache.pinot.pql.parsers.pql2.ast.HavingAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.IdentifierAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.InPredicateAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.IntegerLiteralAstNode;
-import org.apache.pinot.pql.parsers.pql2.ast.IsPredicateAstNode;
+import org.apache.pinot.pql.parsers.pql2.ast.IsNullPredicateAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.LimitAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.OptionAstNode;
 import org.apache.pinot.pql.parsers.pql2.ast.OptionsAstNode;
@@ -137,7 +137,7 @@ public class Pql2AstListener extends PQL2BaseListener {
 
   @Override
   public void enterIsPredicate(@NotNull PQL2Parser.IsPredicateContext ctx) {
-    pushNode(new IsPredicateAstNode());
+    pushNode(new IsNullPredicateAstNode(ctx.isClause().NOT() != null));
   }
 
   @Override
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/FilterKind.java
 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/FilterKind.java
index baca493..c6fb016 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/FilterKind.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/FilterKind.java
@@ -30,5 +30,7 @@ public enum FilterKind {
   BETWEEN,
   IN,
   NOT_IN,
-  REGEXP_LIKE
+  REGEXP_LIKE,
+  IS_NULL,
+  IS_NOT_NULL
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsNullPredicateAstNode.java
 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsNullPredicateAstNode.java
new file mode 100644
index 0000000..dd1e6fc
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsNullPredicateAstNode.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.pql.parsers.pql2.ast;
+
+import java.util.Collections;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.HavingQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.pql.parsers.Pql2CompilationException;
+
+
+/**
+ * AST node for IS predicates (foo IS NULL, foo IS NOT NULL).
+ */
+public class IsNullPredicateAstNode extends PredicateAstNode {
+
+  private final boolean _isNegation;
+
+  public IsNullPredicateAstNode(boolean notNodeExists) {
+    _isNegation = notNodeExists;
+  }
+
+  @Override
+  public void addChild(AstNode childNode) {
+    if (childNode instanceof IdentifierAstNode) {
+      if (_identifier == null) {
+        _identifier = ((IdentifierAstNode) childNode).getName();
+      } else {
+        throw new Pql2CompilationException("Only one column supported in IS 
predicate.");
+      }
+    } else if (childNode instanceof FunctionCallAstNode) {
+      throw new Pql2CompilationException("Function not supported in IS 
predicate");
+    } else if (childNode instanceof LiteralAstNode) {
+      throw new Pql2CompilationException("Constants not supported in IS 
predicate");
+    } else {
+      super.addChild(childNode);
+    }
+  }
+
+  @Override
+  public FilterQueryTree buildFilterQueryTree() {
+    if (_identifier == null) {
+      throw new Pql2CompilationException("IS predicate has no identifier");
+    }
+    if (_isNegation) {
+      return new FilterQueryTree(_identifier, Collections.EMPTY_LIST, 
FilterOperator.IS_NOT_NULL, null);
+    }
+    return new FilterQueryTree(_identifier, Collections.EMPTY_LIST, 
FilterOperator.IS_NULL, null);
+  }
+
+  @Override
+  public Expression buildFilterExpression() {
+    if (_identifier == null) {
+      throw new Pql2CompilationException("IS predicate has no identifier");
+    }
+    String filterName = _isNegation ? FilterKind.IS_NOT_NULL.name() : 
FilterKind.IS_NULL.name();
+    Expression expression = RequestUtils.getFunctionExpression(filterName);
+    
expression.getFunctionCall().addToOperands(RequestUtils.createIdentifierExpression(_identifier));
+    return expression;
+  }
+
+  @Override
+  public HavingQueryTree buildHavingQueryTree() {
+    throw new Pql2CompilationException("IS NOT? NULL predicate is not 
supported in HAVING clause.");
+  }
+}
diff --git a/pinot-common/src/thrift/request.thrift 
b/pinot-common/src/thrift/request.thrift
index 29cd461..50bcd43 100644
--- a/pinot-common/src/thrift/request.thrift
+++ b/pinot-common/src/thrift/request.thrift
@@ -31,7 +31,9 @@ enum FilterOperator {
   RANGE,
   REGEXP_LIKE,
   NOT_IN,
-  IN
+  IN,
+  IS_NULL,
+  IS_NOT_NULL
 }
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
index 4140017..d528e98 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/Predicate.java
@@ -23,6 +23,8 @@ import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.core.common.predicate.EqPredicate;
 import org.apache.pinot.core.common.predicate.InPredicate;
+import org.apache.pinot.core.common.predicate.IsNotNullPredicate;
+import org.apache.pinot.core.common.predicate.IsNullPredicate;
 import org.apache.pinot.core.common.predicate.NEqPredicate;
 import org.apache.pinot.core.common.predicate.NotInPredicate;
 import org.apache.pinot.core.common.predicate.RangePredicate;
@@ -32,7 +34,7 @@ import 
org.apache.pinot.core.common.predicate.RegexpLikePredicate;
 public abstract class Predicate {
 
   public enum Type {
-    EQ, NEQ, REGEXP_LIKE, RANGE, IN, NOT_IN;
+    EQ, NEQ, REGEXP_LIKE, RANGE, IN, NOT_IN, IS_NULL, IS_NOT_NULL;
 
     public boolean isExclusive() {
       return this == NEQ || this == NOT_IN;
@@ -92,6 +94,12 @@ public abstract class Predicate {
       case IN:
         predicate = new InPredicate(column, value);
         break;
+      case IS_NULL:
+        predicate = new IsNullPredicate(column);
+        break;
+      case IS_NOT_NULL:
+        predicate = new IsNotNullPredicate(column);
+        break;
       default:
         throw new UnsupportedOperationException("Unsupported filterType:" + 
filterType);
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNotNullPredicate.java
similarity index 69%
copy from 
pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNotNullPredicate.java
index 9cb001e..8318c31 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNotNullPredicate.java
@@ -16,16 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.pql.parsers.pql2.ast;
+package org.apache.pinot.core.common.predicate;
 
-import org.apache.pinot.pql.parsers.Pql2CompilationException;
+import org.apache.pinot.core.common.Predicate;
 
 
-/**
- * AST node for IS predicates (foo IS NULL, foo IS NOT NULL).
- */
-public class IsPredicateAstNode extends BaseAstNode {
-  public IsPredicateAstNode() {
-    throw new Pql2CompilationException("IS predicate is not supported");
+public class IsNotNullPredicate extends Predicate {
+  public IsNotNullPredicate(String column) {
+    super(column, Type.IS_NOT_NULL, null);
+  }
+
+  @Override
+  public String toString() {
+    return "Predicate: type: " + getType() + ", left : " + getLhs() + "\n";
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNullPredicate.java
similarity index 70%
rename from 
pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNullPredicate.java
index 9cb001e..3318842 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/IsPredicateAstNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/predicate/IsNullPredicate.java
@@ -16,16 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.pql.parsers.pql2.ast;
+package org.apache.pinot.core.common.predicate;
 
-import org.apache.pinot.pql.parsers.Pql2CompilationException;
+import org.apache.pinot.core.common.Predicate;
 
 
-/**
- * AST node for IS predicates (foo IS NULL, foo IS NOT NULL).
- */
-public class IsPredicateAstNode extends BaseAstNode {
-  public IsPredicateAstNode() {
-    throw new Pql2CompilationException("IS predicate is not supported");
+public class IsNullPredicate extends Predicate {
+  public IsNullPredicate(String column) {
+    super(column, Type.IS_NULL, null);
+  }
+
+  @Override
+  public String toString() {
+    return "Predicate: type: " + getType() + ", left : " + getLhs() + "\n";
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 854a795..645c798 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
+import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
 import org.apache.pinot.core.operator.filter.ExpressionFilterOperator;
 import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
@@ -44,6 +45,7 @@ import 
org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import 
org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -111,6 +113,15 @@ public class FilterPlanNode implements PlanNode {
       // Leaf filter operator
       Predicate predicate = Predicate.newPredicate(filterQueryTree);
 
+      // Check for null predicate
+      Predicate.Type type = predicate.getType();
+      if (type.equals(Predicate.Type.IS_NULL) || 
type.equals(Predicate.Type.IS_NOT_NULL)) {
+        DataSource dataSource = 
segment.getDataSource(filterQueryTree.getColumn());
+        ImmutableRoaringBitmap nullBitmap = 
dataSource.getNullValueVector().getNullBitmap();
+        boolean exclusive = (type == Predicate.Type.IS_NOT_NULL);
+        return new BitmapBasedFilterOperator(new 
ImmutableRoaringBitmap[]{nullBitmap}, 0, numDocs - 1, exclusive);
+      }
+
       TransformExpressionTree expression = filterQueryTree.getExpression();
       if (expression.getExpressionType() == 
TransformExpressionTree.ExpressionType.FUNCTION) {
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/ThreadSafeMutableRoaringBitmap.java
similarity index 50%
copy from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/ThreadSafeMutableRoaringBitmap.java
index 341a4df..f69acd8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/ThreadSafeMutableRoaringBitmap.java
@@ -16,27 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.impl.nullvalue;
+package org.apache.pinot.core.realtime.impl;
 
-import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
- * Defines a real-time null value vector to be used in realtime ingestion.
+ * Helper wrapper class for {@link MutableRoaringBitmap} to make it 
thread-safe.
  */
-public class RealtimeNullValueVectorReaderWriter implements 
NullValueVectorReader {
-  private final MutableRoaringBitmap _nullBitmap;
+public class ThreadSafeMutableRoaringBitmap {
+  private MutableRoaringBitmap _mutableRoaringBitmap;
 
-  public RealtimeNullValueVectorReaderWriter() {
-    _nullBitmap = new MutableRoaringBitmap();
+  public ThreadSafeMutableRoaringBitmap() {
+    _mutableRoaringBitmap = new MutableRoaringBitmap();
   }
 
-  public void setNull(int docId) {
-    _nullBitmap.add(docId);
+  public ThreadSafeMutableRoaringBitmap(int firstDocId) {
+    _mutableRoaringBitmap = new MutableRoaringBitmap();
+    _mutableRoaringBitmap.add(firstDocId);
   }
 
-  public boolean isNull(int docId) {
-    return _nullBitmap.contains(docId);
+  public void checkAndAdd(int docId) {
+    if (!_mutableRoaringBitmap.contains(docId)) {
+      synchronized (this) {
+        _mutableRoaringBitmap.add(docId);
+      }
+    }
+  }
+
+  public boolean contains(int docId) {
+    return _mutableRoaringBitmap.contains(docId);
+  }
+
+  public synchronized MutableRoaringBitmap getMutableRoaringBitmap() {
+    return _mutableRoaringBitmap.clone();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 1ff94f6..e66ba76 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.realtime.impl.invertedindex;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -81,28 +82,4 @@ public class RealtimeInvertedIndexReader implements 
InvertedIndexReader<MutableR
   @Override
   public void close() {
   }
-
-  /**
-   * Helper wrapper class for {@link MutableRoaringBitmap} to make it 
thread-safe.
-   */
-  private static class ThreadSafeMutableRoaringBitmap {
-    private MutableRoaringBitmap _mutableRoaringBitmap;
-
-    public ThreadSafeMutableRoaringBitmap(int firstDocId) {
-      _mutableRoaringBitmap = new MutableRoaringBitmap();
-      _mutableRoaringBitmap.add(firstDocId);
-    }
-
-    public void checkAndAdd(int docId) {
-      if (!_mutableRoaringBitmap.contains(docId)) {
-        synchronized (this) {
-          _mutableRoaringBitmap.add(docId);
-        }
-      }
-    }
-
-    public synchronized MutableRoaringBitmap getMutableRoaringBitmap() {
-      return _mutableRoaringBitmap.clone();
-    }
-  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
index 341a4df..0900b55 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/nullvalue/RealtimeNullValueVectorReaderWriter.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.realtime.impl.nullvalue;
 
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -26,17 +28,22 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  * Defines a real-time null value vector to be used in realtime ingestion.
  */
 public class RealtimeNullValueVectorReaderWriter implements 
NullValueVectorReader {
-  private final MutableRoaringBitmap _nullBitmap;
+  private final ThreadSafeMutableRoaringBitmap _nullBitmap;
 
   public RealtimeNullValueVectorReaderWriter() {
-    _nullBitmap = new MutableRoaringBitmap();
+    _nullBitmap = new ThreadSafeMutableRoaringBitmap();
   }
 
   public void setNull(int docId) {
-    _nullBitmap.add(docId);
+    _nullBitmap.checkAndAdd(docId);
   }
 
   public boolean isNull(int docId) {
     return _nullBitmap.contains(docId);
   }
+
+  @Override
+  public ImmutableRoaringBitmap getNullBitmap() {
+    return _nullBitmap.getMutableRoaringBitmap();
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReader.java
index 262a79a..8ad0ded 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReader.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.segment.index.readers;
 
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
 /**
  * Reader interface to read from an underlying Null value vector. This is
  * primarily used to check if a particular column value corresponding to
@@ -32,4 +35,9 @@ public interface NullValueVectorReader {
    * @return true if docId is absent (null). False otherwise
    */
   boolean isNull(int docId);
+
+  /**
+   * Return the underlying null bitmap (used in query execution)
+   */
+  ImmutableRoaringBitmap getNullBitmap();
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReaderImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReaderImpl.java
index 1f9d3fd..1f65473 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReaderImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/NullValueVectorReaderImpl.java
@@ -36,4 +36,8 @@ public class NullValueVectorReaderImpl implements 
NullValueVectorReader {
     return _nullBitmap.contains(docId);
   }
 
+  @Override
+  public ImmutableRoaringBitmap getNullBitmap() {
+    return _nullBitmap;
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index aeb5bb4..a4ee984 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.core.segment.index.creator;
 
+import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,7 +29,23 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -46,6 +64,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.core.segment.index.creator.RawIndexCreatorTest.getRandomValue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 /**
@@ -57,6 +77,7 @@ public class SegmentGenerationWithNullValueVectorTest {
   private static final String SEGMENT_DIR_NAME =
       System.getProperty("java.io.tmpdir") + File.separator + 
"nullValueVectorTest";
   private static final String SEGMENT_NAME = "testSegment";
+  private static final long LONG_VALUE_THRESHOLD = 100;
 
   private Random _random;
   private Schema _schema;
@@ -70,6 +91,18 @@ public class SegmentGenerationWithNullValueVectorTest {
 
   Map<String, boolean[]> _actualNullVectorMap = new HashMap<>();
 
+  // Required for subsequent queries
+  private static final Pql2Compiler COMPILER = new Pql2Compiler();
+  private final List<String> _segmentNames = new ArrayList<>();
+  private InstanceDataManager _instanceDataManager;
+  private ServerMetrics _serverMetrics;
+  private QueryExecutor _queryExecutor;
+  private static final String TABLE_NAME = "testTable";
+  private static final String QUERY_EXECUTOR_CONFIG_PATH = 
"conf/query-executor.properties";
+  private static final ExecutorService QUERY_RUNNERS = 
Executors.newFixedThreadPool(20);
+  private int nullIntKeyCount = 0;
+  private int longKeyCount = 0;
+
   /**
    * Setup to build a segment with raw indexes (no-dictionary) of various data 
types.
    *
@@ -89,6 +122,37 @@ public class SegmentGenerationWithNullValueVectorTest {
     _random = new Random(System.nanoTime());
     buildIndex(_schema);
     _segment = ImmutableSegmentLoader.load(new File(SEGMENT_DIR_NAME, 
SEGMENT_NAME), ReadMode.heap);
+
+    setupQueryServer();
+  }
+
+  // Registers the segment and initializes Query Executor
+  private void setupQueryServer()
+      throws ConfigurationException {
+    _segmentNames.add(_segment.getSegmentName());
+    // Mock the instance data manager
+    _serverMetrics = new ServerMetrics(new MetricsRegistry());
+    TableDataManagerConfig tableDataManagerConfig = 
mock(TableDataManagerConfig.class);
+    
when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
+    when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
+    
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    @SuppressWarnings("unchecked")
+    TableDataManager tableDataManager = TableDataManagerProvider
+        .getTableDataManager(tableDataManagerConfig, "testInstance", 
mock(ZkHelixPropertyStore.class),
+            mock(ServerMetrics.class));
+    tableDataManager.start();
+    tableDataManager.addSegment(_segment);
+    _instanceDataManager = mock(InstanceDataManager.class);
+    
when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
+
+    // Set up the query executor
+    URL resourceUrl = 
getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
+    Assert.assertNotNull(resourceUrl);
+    PropertiesConfiguration queryExecutorConfig = new 
PropertiesConfiguration();
+    queryExecutorConfig.setDelimiterParsingDisabled(false);
+    queryExecutorConfig.load(new File(resourceUrl.getFile()));
+    _queryExecutor = new ServerQueryExecutorV1Impl();
+    _queryExecutor.init(queryExecutorConfig, _instanceDataManager, 
_serverMetrics);
   }
 
   /**
@@ -137,6 +201,15 @@ public class SegmentGenerationWithNullValueVectorTest {
           _actualNullVectorMap.get(key)[rowId] = true;
         }
       }
+
+      if (_actualNullVectorMap.get(INT_COLUMN)[rowId]) {
+        nullIntKeyCount++;
+      } else if (!_actualNullVectorMap.get(LONG_COLUMN)[rowId]) {
+        if ((long)map.get(LONG_COLUMN) > LONG_VALUE_THRESHOLD) {
+          longKeyCount++;
+        }
+      }
+
       genericRow.init(map);
       rows.add(genericRow);
     }
@@ -166,6 +239,37 @@ public class SegmentGenerationWithNullValueVectorTest {
     }
   }
 
+  @Test
+  public void testNotNullPredicate() {
+    String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + 
INT_COLUMN + " IS NOT NULL";
+    InstanceRequest instanceRequest = new InstanceRequest(0L, 
COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(_segmentNames);
+    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Assert.assertEquals(instanceResponse.getLong(0, 0), NUM_ROWS - 
nullIntKeyCount);
+  }
+
+  @Test
+  public void testNullPredicate() {
+    String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + 
INT_COLUMN + " IS NULL";
+    InstanceRequest instanceRequest = new InstanceRequest(0L, 
COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(_segmentNames);
+    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Assert.assertEquals(instanceResponse.getLong(0, 0), nullIntKeyCount);
+  }
+
+  @Test
+  public void testNullWithAndPredicate() {
+    String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + 
INT_COLUMN + " IS NOT NULL and " + LONG_COLUMN + " > " + LONG_VALUE_THRESHOLD;
+    InstanceRequest instanceRequest = new InstanceRequest(0L, 
COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(_segmentNames);
+    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Assert.assertEquals(instanceResponse.getLong(0, 0), longKeyCount);
+  }
+
+  private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
+    return new ServerQueryRequest(instanceRequest, _serverMetrics, 
System.currentTimeMillis());
+  }
+
   /**
    * Clean up after test
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to