This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7510e6e  Fix potential NPEs in joins (#9760)
7510e6e is described below

commit 7510e6e722fa3829d809e7151b870ebcf947194c
Author: Suneet Saldanha <[email protected]>
AuthorDate: Wed Apr 29 11:03:13 2020 -0700

    Fix potential NPEs in joins (#9760)
    
    * Fix potential NPEs in joins
    
    intelliJ reported issues with potential NPEs. This was first hit in testing
    with a filter being pushed down to the left hand table when joining against
    an indexed table.
    
    * More null check cleanup
    
    * Optimize filter value rewrite for IndexedTable
    
    * Add unit tests for LookupJoinable
    
    * Add tests for IndexedTableJoinable
    
    * Add non null assert for dimension selector
    
    * Supress null warning in LookupJoinMatcher
    
    * remove some null checks on hot path
---
 processing/pom.xml                                 |   6 +
 .../druid/segment/filter/cnf/CalciteCnfHelper.java |   4 +-
 .../join/HashJoinSegmentStorageAdapter.java        |   1 +
 .../druid/segment/join/JoinConditionAnalysis.java  |  14 +-
 .../org/apache/druid/segment/join/Joinable.java    |   6 +-
 .../join/PossiblyNullDimensionSelector.java        |   5 +-
 .../segment/join/filter/JoinFilterAnalyzer.java    |  12 +-
 .../segment/join/lookup/LookupJoinMatcher.java     |   1 +
 .../druid/segment/join/lookup/LookupJoinable.java  |   8 +-
 .../table/IndexedTableColumnValueSelector.java     |   3 +
 .../segment/join/table/IndexedTableJoinable.java   |  16 +-
 .../druid/segment/transform/Transformer.java       |   1 +
 .../QueryableIndexVectorColumnSelectorFactory.java |   4 +
 .../MultiValueExpressionDimensionSelector.java     |   7 +-
 .../druid/segment/virtual/SingleInputBindings.java |   1 +
 .../segment/join/lookup/LookupJoinableTest.java    | 251 +++++++++++++++++++++
 .../join/table/IndexedTableJoinableTest.java       | 215 +++++++++++++++---
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |  13 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  39 ++++
 19 files changed, 547 insertions(+), 60 deletions(-)

diff --git a/processing/pom.xml b/processing/pom.xml
index ac3bdcf..8831041 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -194,6 +194,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>3.2.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>nl.jqno.equalsverifier</groupId>
             <artifactId>equalsverifier</artifactId>
             <scope>test</scope>
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/cnf/CalciteCnfHelper.java
 
b/processing/src/main/java/org/apache/druid/segment/filter/cnf/CalciteCnfHelper.java
index f857ae2..0685948 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/cnf/CalciteCnfHelper.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/cnf/CalciteCnfHelper.java
@@ -59,9 +59,7 @@ public class CalciteCnfHelper
       final List<Filter> list = new ArrayList<>();
       for (Filter operand : operands) {
         Filter removed = removeFactor(factors, operand);
-        if (removed != null) {
-          list.add(removed);
-        }
+        list.add(removed);
       }
       if (list.isEmpty()) {
         return and(factors.values());
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
index 46f522c..c854ac8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java
@@ -236,6 +236,7 @@ public class HashJoinSegmentStorageAdapter implements 
StorageAdapter
     return Sequences.map(
         baseCursorSequence,
         cursor -> {
+          assert cursor != null;
           Cursor retVal = cursor;
 
           for (JoinableClause clause : clauses) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java
 
b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java
index dddd618..69bb7af 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/JoinConditionAnalysis.java
@@ -79,7 +79,7 @@ public class JoinConditionAnalysis
                                                                 .allMatch(expr 
-> expr.isLiteral() && expr.eval(
                                                                     
ExprUtils.nilBindings()).asBoolean());
     canHashJoin = nonEquiConditions.stream().allMatch(Expr::isLiteral);
-    rightKeyColumns = 
getEquiConditions().stream().map(Equality::getRightColumn).distinct().collect(Collectors.toSet());
+    rightKeyColumns = 
getEquiConditions().stream().map(Equality::getRightColumn).collect(Collectors.toSet());
   }
 
   /**
@@ -108,14 +108,18 @@ public class JoinConditionAnalysis
         nonEquiConditions.add(childExpr);
       } else {
         final Pair<Expr, Expr> decomposed = maybeDecomposed.get();
-        final Expr lhs = decomposed.lhs;
-        final Expr rhs = decomposed.rhs;
+        final Expr lhs = Objects.requireNonNull(decomposed.lhs);
+        final Expr rhs = Objects.requireNonNull(decomposed.rhs);
 
         if (isLeftExprAndRightColumn(lhs, rhs, rightPrefix)) {
           // rhs is a right-hand column; lhs is an expression solely of the 
left-hand side.
-          equiConditions.add(new Equality(lhs, 
rhs.getBindingIfIdentifier().substring(rightPrefix.length())));
+          equiConditions.add(
+              new Equality(lhs, 
Objects.requireNonNull(rhs.getBindingIfIdentifier()).substring(rightPrefix.length()))
+          );
         } else if (isLeftExprAndRightColumn(rhs, lhs, rightPrefix)) {
-          equiConditions.add(new Equality(rhs, 
lhs.getBindingIfIdentifier().substring(rightPrefix.length())));
+          equiConditions.add(
+              new Equality(rhs, 
Objects.requireNonNull(lhs.getBindingIfIdentifier()).substring(rightPrefix.length()))
+          );
         } else {
           nonEquiConditions.add(childExpr);
         }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java 
b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java
index 7983f6f..1ededff 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java
@@ -81,9 +81,9 @@ public interface Joinable
    * Searches a column from this Joinable for a particular value, finds rows 
that match,
    * and returns values of a second column for those rows.
    *
-   * @param searchColumnName Name of the search column
-   * @param searchColumnValue Target value of the search column
-   * @param retrievalColumnName The column to retrieve values from
+   * @param searchColumnName Name of the search column. This is the column 
that is being used in the filter
+   * @param searchColumnValue Target value of the search column. This is the 
value that is being filtered on.
+   * @param retrievalColumnName The column to retrieve values from. This is 
the column that is being joined against.
    * @param maxCorrelationSetSize Maximum number of values to retrieve. If we 
detect that more values would be
    *                              returned than this limit, return an empty 
set.
    * @param allowNonKeyColumnSearch If true, allow searchs on non-key columns. 
If this is false,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/PossiblyNullDimensionSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/join/PossiblyNullDimensionSelector.java
index b678b45..ab3fb87 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/PossiblyNullDimensionSelector.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/PossiblyNullDimensionSelector.java
@@ -138,7 +138,10 @@ public class PossiblyNullDimensionSelector extends 
AbstractDimensionSelector imp
       // id 0 is always null for this selector impl.
       return 0;
     } else {
-      return baseSelector.idLookup().lookupId(name) + nullAdjustment;
+      IdLookup idLookup = baseSelector.idLookup();
+      // idLookup is null here because callers are expected to check this 
condition before calling lookupId
+      assert idLookup != null;
+      return idLookup.lookupId(name) + nullAdjustment;
     }
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
index 69e3b23..7b2032f 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.join.JoinConditionAnalysis;
 import org.apache.druid.segment.join.JoinableClause;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -175,9 +176,7 @@ public class JoinFilterAnalyzer
       for (Equality equality : clause.getCondition().getEquiConditions()) {
         Set<Expr> exprsForRhs = equiconditions.computeIfAbsent(
             clause.getPrefix() + equality.getRightColumn(),
-            (rhs) -> {
-              return new HashSet<>();
-            }
+            (rhs) -> new HashSet<>()
         );
         exprsForRhs.add(equality.getLeftExpr());
       }
@@ -263,9 +262,7 @@ public class JoinFilterAnalyzer
           Optional<List<JoinFilterColumnCorrelationAnalysis>> 
perColumnCorrelations =
               correlationsByFilteringColumn.computeIfAbsent(
                   rhsRewriteCandidate.getRhsColumn(),
-                  (rhsCol) -> {
-                    return Optional.of(new ArrayList<>());
-                  }
+                  (rhsCol) -> Optional.of(new ArrayList<>())
               );
           perColumnCorrelations.get().add(correlationForPrefix.getValue());
           
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
@@ -350,6 +347,7 @@ public class JoinFilterAnalyzer
           joinFilterPreAnalysis
       );
       if (joinFilterAnalysis.isCanPushDown()) {
+        //noinspection OptionalGetWithoutIsPresent isCanPushDown checks 
isPresent
         leftFilters.add(joinFilterAnalysis.getPushDownFilter().get());
         if (!joinFilterAnalysis.getPushDownVirtualColumns().isEmpty()) {
           
pushDownVirtualColumns.addAll(joinFilterAnalysis.getPushDownVirtualColumns());
@@ -438,6 +436,7 @@ public class JoinFilterAnalyzer
         if (!rewritten.isCanPushDown()) {
           return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
         } else {
+          //noinspection OptionalGetWithoutIsPresent isCanPushDown checks 
isPresent
           newFilters.add(rewritten.getPushDownFilter().get());
         }
       } else {
@@ -762,6 +761,7 @@ public class JoinFilterAnalyzer
     return valueMatcher.matches();
   }
 
+  @Nullable
   private static JoinableClause isColumnFromJoin(
       List<JoinableClause> joinableClauses,
       String column
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java
 
b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java
index 15995b1..624b6bc 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinMatcher.java
@@ -275,6 +275,7 @@ public class LookupJoinMatcher implements JoinMatcher
     } else if (condition.isAlwaysTrue()) {
       currentIterator = Collections.emptyIterator();
     } else {
+      //noinspection ConstantConditions - entry can not be null because 
extractor.iterable() prevents this
       currentIterator = Iterators.filter(
           extractor.iterable().iterator(),
           entry -> !matchedKeys.contains(entry.getKey())
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java
index 6b1b41a..353808d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java
@@ -31,6 +31,7 @@ import org.apache.druid.segment.join.JoinMatcher;
 import org.apache.druid.segment.join.Joinable;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -95,18 +96,23 @@ public class LookupJoinable implements Joinable
       boolean allowNonKeyColumnSearch
   )
   {
+    if (!ALL_COLUMNS.contains(searchColumnName) || 
!ALL_COLUMNS.contains(retrievalColumnName)) {
+      return ImmutableSet.of();
+    }
     Set<String> correlatedValues;
     if (LookupColumnSelectorFactory.KEY_COLUMN.equals(searchColumnName)) {
       if (LookupColumnSelectorFactory.KEY_COLUMN.equals(retrievalColumnName)) {
         correlatedValues = ImmutableSet.of(searchColumnValue);
       } else {
-        correlatedValues = ImmutableSet.of(extractor.apply(searchColumnName));
+        // This should not happen in practice because the column to be joined 
on must be a key.
+        correlatedValues = 
Collections.singleton(extractor.apply(searchColumnValue));
       }
     } else {
       if (!allowNonKeyColumnSearch) {
         return ImmutableSet.of();
       }
       if 
(LookupColumnSelectorFactory.VALUE_COLUMN.equals(retrievalColumnName)) {
+        // This should not happen in practice because the column to be joined 
on must be a key.
         correlatedValues = ImmutableSet.of(searchColumnValue);
       } else {
         // Lookup extractor unapply only provides a list of strings, so we 
can't respect
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java
index 814658c..5f6b07a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java
@@ -52,6 +52,7 @@ public class IndexedTableColumnValueSelector implements 
ColumnValueSelector<Obje
 
     // Otherwise this shouldn't have been called (due to isNull returning 
true).
     assert NullHandling.replaceWithDefault();
+    //noinspection ConstantConditions assert statement above guarantees this 
is non null.
     return NullHandling.defaultDoubleValue();
   }
 
@@ -70,6 +71,7 @@ public class IndexedTableColumnValueSelector implements 
ColumnValueSelector<Obje
 
     // Otherwise this shouldn't have been called (due to isNull returning 
true).
     assert NullHandling.replaceWithDefault();
+    //noinspection ConstantConditions assert statement above guarantees this 
is non null.
     return NullHandling.defaultFloatValue();
   }
 
@@ -88,6 +90,7 @@ public class IndexedTableColumnValueSelector implements 
ColumnValueSelector<Obje
 
     // Otherwise this shouldn't have been called (due to isNull returning 
true).
     assert NullHandling.replaceWithDefault();
+    //noinspection ConstantConditions assert statement above guarantees this 
is non null.
     return NullHandling.defaultLongValue();
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java
 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java
index 38cf5f8..a661b5a 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.join.Joinable;
 import javax.annotation.Nullable;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
 public class IndexedTableJoinable implements Joinable
@@ -103,7 +104,8 @@ public class IndexedTableJoinable implements Joinable
       IntList rowIndex = index.find(searchColumnValue);
       for (int i = 0; i < rowIndex.size(); i++) {
         int rowNum = rowIndex.getInt(i);
-        correlatedValues.add(reader.read(rowNum).toString());
+        String correlatedDimVal = Objects.toString(reader.read(rowNum), null);
+        correlatedValues.add(correlatedDimVal);
 
         if (correlatedValues.size() > maxCorrelationSetSize) {
           return ImmutableSet.of();
@@ -118,11 +120,13 @@ public class IndexedTableJoinable implements Joinable
       IndexedTable.Reader dimNameReader = 
table.columnReader(filterColumnPosition);
       IndexedTable.Reader correlatedColumnReader = 
table.columnReader(correlatedColumnPosition);
       for (int i = 0; i < table.numRows(); i++) {
-        if (searchColumnValue.equals(dimNameReader.read(i).toString())) {
-          correlatedValues.add(correlatedColumnReader.read(i).toString());
-        }
-        if (correlatedValues.size() > maxCorrelationSetSize) {
-          return ImmutableSet.of();
+        String dimVal = Objects.toString(dimNameReader.read(i), null);
+        if (searchColumnValue.equals(dimVal)) {
+          String correlatedDimVal = 
Objects.toString(correlatedColumnReader.read(i), null);
+          correlatedValues.add(correlatedDimVal);
+          if (correlatedValues.size() > maxCorrelationSetSize) {
+            return ImmutableSet.of();
+          }
         }
       }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java 
b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
index 48b0421..47d89b2 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/transform/Transformer.java
@@ -156,6 +156,7 @@ public class Transformer
     {
       final RowFunction transform = 
transforms.get(ColumnHolder.TIME_COLUMN_NAME);
       if (transform != null) {
+        //noinspection ConstantConditions time column is never null
         return Rows.objectToNumber(ColumnHolder.TIME_COLUMN_NAME, 
transform.eval(row), true).longValue();
       } else {
         return row.getTimestampFromEpoch();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
index 8311049..d816589 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/vector/QueryableIndexVectorColumnSelectorFactory.java
@@ -96,6 +96,8 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
           final DictionaryEncodedColumn<String> dictionaryEncodedColumn = 
(DictionaryEncodedColumn<String>)
               getCachedColumn(spec.getDimension());
 
+          // dictionaryEncodedColumn is not null because of holder null check 
above
+          assert dictionaryEncodedColumn != null;
           final MultiValueDimensionVectorSelector selector = 
dictionaryEncodedColumn.makeMultiValueDimensionVectorSelector(
               offset
           );
@@ -132,6 +134,8 @@ public class QueryableIndexVectorColumnSelectorFactory 
implements VectorColumnSe
           final DictionaryEncodedColumn<String> dictionaryEncodedColumn = 
(DictionaryEncodedColumn<String>)
               getCachedColumn(spec.getDimension());
 
+          // dictionaryEncodedColumn is not null because of holder null check 
above
+          assert dictionaryEncodedColumn != null;
           final SingleValueDimensionVectorSelector selector =
               
dictionaryEncodedColumn.makeSingleValueDimensionVectorSelector(offset);
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/MultiValueExpressionDimensionSelector.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/MultiValueExpressionDimensionSelector.java
index e3b5734..513631e 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/MultiValueExpressionDimensionSelector.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/MultiValueExpressionDimensionSelector.java
@@ -55,6 +55,7 @@ public class MultiValueExpressionDimensionSelector implements 
DimensionSelector
     return baseSelector.getObject();
   }
 
+  @Nullable
   String getValue(ExprEval evaluated)
   {
     assert !evaluated.isArray();
@@ -64,15 +65,18 @@ public class MultiValueExpressionDimensionSelector 
implements DimensionSelector
   List<String> getArray(ExprEval evaluated)
   {
     assert evaluated.isArray();
+    //noinspection ConstantConditions
     return Arrays.stream(evaluated.asStringArray())
                  .map(NullHandling::emptyToNullIfNeeded)
                  .collect(Collectors.toList());
   }
 
+  @Nullable
   String getArrayValue(ExprEval evaluated, int i)
   {
     assert evaluated.isArray();
     String[] stringArray = evaluated.asStringArray();
+    //noinspection ConstantConditions because of assert statement above
     assert i < stringArray.length;
     return NullHandling.emptyToNullIfNeeded(stringArray[i]);
   }
@@ -83,7 +87,8 @@ public class MultiValueExpressionDimensionSelector implements 
DimensionSelector
     ExprEval evaluated = getEvaluated();
     if (evaluated.isArray()) {
       RangeIndexedInts ints = new RangeIndexedInts();
-      ints.setSize(evaluated.asArray() != null ? evaluated.asArray().length : 
0);
+      Object[] evaluatedArray = evaluated.asArray();
+      ints.setSize(evaluatedArray != null ? evaluatedArray.length : 0);
       return ints;
     }
     return ZeroIndexedInts.instance();
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/SingleInputBindings.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/SingleInputBindings.java
index eecc9fe..2525e09 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/SingleInputBindings.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/SingleInputBindings.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 public class SingleInputBindings implements Expr.ObjectBinding
 {
+  @Nullable
   private Object value;
 
   @Override
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java
new file mode 100644
index 0000000..4115b84
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/lookup/LookupJoinableTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.lookup;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.join.Joinable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LookupJoinableTest
+{
+  private static final String UNKNOWN_COLUMN = "UNKNOWN_COLUMN";
+  private static final String SEARCH_KEY_VALUE = "SEARCH_KEY_VALUE";
+  private static final String SEARCH_KEY_NULL_VALUE = "SEARCH_KEY_NULL_VALUE";
+  private static final String SEARCH_VALUE_VALUE = "SEARCH_VALUE_VALUE";
+  private static final String SEARCH_VALUE_UNKNOWN = "SEARCH_VALUE_UNKNOWN";
+
+  @Mock
+  private LookupExtractor extractor;
+
+  private LookupJoinable target;
+
+  @Before
+  public void setUp()
+  {
+    
Mockito.doReturn(SEARCH_VALUE_VALUE).when(extractor).apply(SEARCH_KEY_VALUE);
+    
Mockito.doReturn(ImmutableList.of(SEARCH_KEY_VALUE)).when(extractor).unapply(SEARCH_VALUE_VALUE);
+    
Mockito.doReturn(ImmutableList.of()).when(extractor).unapply(SEARCH_VALUE_UNKNOWN);
+    target = LookupJoinable.wrap(extractor);
+  }
+
+  @Test
+  public void getAvailableColumnShouldReturnOnlyTwoColumns()
+  {
+    List<String> colummns = target.getAvailableColumns();
+    Assert.assertEquals(2, colummns.size());
+    Assert.assertEquals(
+        ImmutableList.of(LookupColumnSelectorFactory.KEY_COLUMN, 
LookupColumnSelectorFactory.VALUE_COLUMN),
+        colummns
+    );
+  }
+
+  @Test
+  public void getCardinalityForUnknownColumnShouldReturnUnknown()
+  {
+    int cardinality = target.getCardinality(UNKNOWN_COLUMN);
+    Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, cardinality);
+  }
+
+  @Test
+  public void getCardinalityForKeyColumnShouldReturnUnknown()
+  {
+    int cardinality = 
target.getCardinality(LookupColumnSelectorFactory.KEY_COLUMN);
+    Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, cardinality);
+  }
+
+  @Test
+  public void getCardinalityForValueColumnShouldReturnUnknown()
+  {
+    int cardinality = 
target.getCardinality(LookupColumnSelectorFactory.VALUE_COLUMN);
+    Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, cardinality);
+  }
+
+  @Test
+  public void getColumnCapabilitiesForKeyColumnShouldReturnStringCaps()
+  {
+    ColumnCapabilities capabilities = 
target.getColumnCapabilities(LookupColumnSelectorFactory.KEY_COLUMN);
+    Assert.assertEquals(ValueType.STRING, capabilities.getType());
+  }
+
+  @Test
+  public void getColumnCapabilitiesForValueColumnShouldReturnStringCaps()
+  {
+    ColumnCapabilities capabilities = 
target.getColumnCapabilities(LookupColumnSelectorFactory.VALUE_COLUMN);
+    Assert.assertEquals(ValueType.STRING, capabilities.getType());
+  }
+
+  @Test
+  public void getColumnCapabilitiesForUnknownColumnShouldReturnNull()
+  {
+    ColumnCapabilities capabilities = 
target.getColumnCapabilities(UNKNOWN_COLUMN);
+    Assert.assertNull(capabilities);
+  }
+
+  @Test
+  public void 
getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet()
+  {
+    Set<String> correlatedValues =
+        target.getCorrelatedColumnValues(
+            UNKNOWN_COLUMN,
+            SEARCH_KEY_VALUE,
+            LookupColumnSelectorFactory.VALUE_COLUMN,
+            0,
+            false);
+
+    Assert.assertEquals(Collections.emptySet(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet()
+  {
+    Set<String> correlatedValues =
+        target.getCorrelatedColumnValues(
+            LookupColumnSelectorFactory.KEY_COLUMN,
+            SEARCH_KEY_VALUE,
+            UNKNOWN_COLUMN,
+            0,
+            false);
+
+    Assert.assertEquals(Collections.emptySet(), correlatedValues);
+  }
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        SEARCH_KEY_VALUE,
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        0,
+        false);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        SEARCH_KEY_VALUE,
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        0,
+        false);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        SEARCH_KEY_NULL_VALUE,
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        0,
+        false);
+    Assert.assertEquals(Collections.singleton(null), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        10,
+        false);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+    correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        10,
+        false);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        0,
+        true);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        10,
+        true);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
+  }
+
+  @Test
+  @Ignore
+  /**
+   * See {@link LookupJoinable#getCorrelatedColumnValues(String, String, 
String, long, boolean)} for implementation
+   * details that cause this test to fail.
+   */
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        0,
+        true);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        LookupColumnSelectorFactory.VALUE_COLUMN,
+        SEARCH_VALUE_UNKNOWN,
+        LookupColumnSelectorFactory.KEY_COLUMN,
+        10,
+        true);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java
index e8e3be4..89f78dc 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java
@@ -36,11 +36,23 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.join.JoinConditionAnalysis;
 import org.apache.druid.segment.join.JoinMatcher;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Set;
+
 public class IndexedTableJoinableTest
 {
   private static final String PREFIX = "j.";
+  private static final String KEY_COLUMN = "str";
+  private static final String VALUE_COLUMN = "long";
+  private static final String UNKNOWN_COLUMN = "unknown";
+  private static final String SEARCH_KEY_NULL_VALUE = "baz";
+  private static final String SEARCH_KEY_VALUE = "foo";
+  private static final String SEARCH_VALUE_VALUE = "1";
+  private static final String SEARCH_VALUE_UNKNOWN = "10";
+  private static final long MAX_CORRELATION_SET_SIZE = 10_000L;
 
   static {
     NullHandling.initializeForTests();
@@ -70,9 +82,13 @@ public class IndexedTableJoinableTest
   private final InlineDataSource inlineDataSource = 
InlineDataSource.fromIterable(
       ImmutableList.of(
           new Object[]{"foo", 1L},
-          new Object[]{"bar", 2L}
+          new Object[]{"bar", 2L},
+          new Object[]{"baz", null}
       ),
-      RowSignature.builder().add("str", ValueType.STRING).add("long", 
ValueType.LONG).build()
+      RowSignature.builder()
+                  .add("str", ValueType.STRING)
+                  .add("long", ValueType.LONG)
+                  .build()
   );
 
   private final RowBasedIndexedTable<Object[]> indexedTable = new 
RowBasedIndexedTable<>(
@@ -82,39 +98,41 @@ public class IndexedTableJoinableTest
       ImmutableSet.of("str")
   );
 
+  private IndexedTableJoinable target;
+
+  @Before
+  public void setUp()
+  {
+    target = new IndexedTableJoinable(indexedTable);
+  }
   @Test
-  public void test_getAvailableColumns()
+  public void getAvailableColumns()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    Assert.assertEquals(ImmutableList.of("str", "long"), 
joinable.getAvailableColumns());
+    Assert.assertEquals(ImmutableList.of("str", "long"), 
target.getAvailableColumns());
   }
 
   @Test
-  public void test_getCardinality_string()
+  public void getCardinalityForStringColumn()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    Assert.assertEquals(indexedTable.numRows() + 1, 
joinable.getCardinality("str"));
+    Assert.assertEquals(indexedTable.numRows() + 1, 
target.getCardinality("str"));
   }
 
   @Test
-  public void test_getCardinality_long()
+  public void getCardinalityForLongColumn()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    Assert.assertEquals(indexedTable.numRows() + 1, 
joinable.getCardinality("long"));
+    Assert.assertEquals(indexedTable.numRows() + 1, 
target.getCardinality("long"));
   }
 
   @Test
-  public void test_getCardinality_nonexistent()
+  public void getCardinalityForNonexistentColumn()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    Assert.assertEquals(1, joinable.getCardinality("nonexistent"));
+    Assert.assertEquals(1, target.getCardinality("nonexistent"));
   }
 
   @Test
-  public void test_getColumnCapabilities_string()
+  public void getColumnCapabilitiesForStringColumn()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    final ColumnCapabilities capabilities = 
joinable.getColumnCapabilities("str");
+    final ColumnCapabilities capabilities = 
target.getColumnCapabilities("str");
     Assert.assertEquals(ValueType.STRING, capabilities.getType());
     Assert.assertTrue(capabilities.isDictionaryEncoded());
     Assert.assertFalse(capabilities.hasBitmapIndexes());
@@ -124,10 +142,9 @@ public class IndexedTableJoinableTest
   }
 
   @Test
-  public void test_getColumnCapabilities_long()
+  public void getColumnCapabilitiesForLongColumn()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    final ColumnCapabilities capabilities = 
joinable.getColumnCapabilities("long");
+    final ColumnCapabilities capabilities = 
target.getColumnCapabilities("long");
     Assert.assertEquals(ValueType.LONG, capabilities.getType());
     Assert.assertFalse(capabilities.isDictionaryEncoded());
     Assert.assertFalse(capabilities.hasBitmapIndexes());
@@ -137,29 +154,27 @@ public class IndexedTableJoinableTest
   }
 
   @Test
-  public void test_getColumnCapabilities_nonexistent()
+  public void getColumnCapabilitiesForNonexistentColumnShouldReturnNull()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
-    final ColumnCapabilities capabilities = 
joinable.getColumnCapabilities("nonexistent");
+    final ColumnCapabilities capabilities = 
target.getColumnCapabilities("nonexistent");
     Assert.assertNull(capabilities);
   }
 
   @Test
-  public void test_makeJoinMatcher_dimensionSelectorOnString()
+  public void makeJoinMatcherWithDimensionSelectorOnString()
   {
-    final IndexedTableJoinable joinable = new 
IndexedTableJoinable(indexedTable);
     final JoinConditionAnalysis condition = 
JoinConditionAnalysis.forExpression(
         "x == \"j.str\"",
         PREFIX,
         ExprMacroTable.nil()
     );
-    final JoinMatcher joinMatcher = 
joinable.makeJoinMatcher(dummyColumnSelectorFactory, condition, false);
+    final JoinMatcher joinMatcher = 
target.makeJoinMatcher(dummyColumnSelectorFactory, condition, false);
 
     final DimensionSelector selector = joinMatcher.getColumnSelectorFactory()
                                                   
.makeDimensionSelector(DefaultDimensionSpec.of("str"));
 
     // getValueCardinality
-    Assert.assertEquals(3, selector.getValueCardinality());
+    Assert.assertEquals(4, selector.getValueCardinality());
 
     // nameLookupPossibleInAdvance
     Assert.assertTrue(selector.nameLookupPossibleInAdvance());
@@ -167,9 +182,153 @@ public class IndexedTableJoinableTest
     // lookupName
     Assert.assertEquals("foo", selector.lookupName(0));
     Assert.assertEquals("bar", selector.lookupName(1));
-    Assert.assertNull(selector.lookupName(2));
+    Assert.assertEquals("baz", selector.lookupName(2));
+    Assert.assertNull(selector.lookupName(3));
 
     // lookupId
     Assert.assertNull(selector.idLookup());
   }
+
+  @Test
+  public void 
getCorrelatedColummnValuesMissingSearchColumnShouldReturnEmptySet()
+  {
+    Set<String> correlatedValues =
+        target.getCorrelatedColumnValues(
+            UNKNOWN_COLUMN,
+            "foo",
+            VALUE_COLUMN,
+            MAX_CORRELATION_SET_SIZE,
+            false);
+
+    Assert.assertEquals(Collections.emptySet(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColummnValuesMissingRetrievalColumnShouldReturnEmptySet()
+  {
+    Set<String> correlatedValues =
+        target.getCorrelatedColumnValues(
+            KEY_COLUMN,
+            "foo",
+            UNKNOWN_COLUMN,
+            MAX_CORRELATION_SET_SIZE,
+            false);
+
+    Assert.assertEquals(Collections.emptySet(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        KEY_COLUMN,
+        SEARCH_KEY_VALUE,
+        KEY_COLUMN,
+        MAX_CORRELATION_SET_SIZE,
+        false);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyAndRetrieveKeyColumnAboveLimitShouldReturnEmptySet()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        KEY_COLUMN,
+        SEARCH_KEY_VALUE,
+        KEY_COLUMN,
+        0,
+        false);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyAndRetrieveValueColumnShouldReturnExtractedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        KEY_COLUMN,
+        SEARCH_KEY_VALUE,
+        VALUE_COLUMN,
+        MAX_CORRELATION_SET_SIZE,
+        false);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchKeyMissingAndRetrieveValueColumnShouldReturnExtractedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        KEY_COLUMN,
+        SEARCH_KEY_NULL_VALUE,
+        VALUE_COLUMN,
+        MAX_CORRELATION_SET_SIZE,
+        false);
+    Assert.assertEquals(Collections.singleton(null), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnAndNonKeyColumnSearchDisabledShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        VALUE_COLUMN,
+        MAX_CORRELATION_SET_SIZE,
+        false);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+    correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        KEY_COLUMN,
+        10,
+        false);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveValueColumnShouldReturnSearchValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        VALUE_COLUMN,
+        MAX_CORRELATION_SET_SIZE,
+        true);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_VALUE_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnShouldReturnUnAppliedValue()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        KEY_COLUMN,
+        10,
+        true);
+    Assert.assertEquals(ImmutableSet.of(SEARCH_KEY_VALUE), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchValueAndRetrieveKeyColumnWithMaxLimitSetShouldHonorMaxLimit()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_VALUE,
+        KEY_COLUMN,
+        0,
+        true);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
+
+  @Test
+  public void 
getCorrelatedColumnValuesForSearchUnknownValueAndRetrieveKeyColumnShouldReturnNoCorrelatedValues()
+  {
+    Set<String> correlatedValues = target.getCorrelatedColumnValues(
+        VALUE_COLUMN,
+        SEARCH_VALUE_UNKNOWN,
+        KEY_COLUMN,
+        10,
+        true);
+    Assert.assertEquals(ImmutableSet.of(), correlatedValues);
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index bf96135..4fc1e8e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -154,12 +154,13 @@ public class BaseCalciteQueryTest extends CalciteTestBase
   public static final String DUMMY_SQL_ID = "dummy";
   public static final String LOS_ANGELES = "America/Los_Angeles";
 
-  public static final Map<String, Object> QUERY_CONTEXT_DEFAULT = 
ImmutableMap.of(
-      PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID,
-      PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, "2000-01-01T00:00:00Z",
-      QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS,
-      QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE
-  );
+  static final ImmutableMap.Builder<String, Object> 
DEFAULT_QUERY_CONTEXT_BUILDER =
+      ImmutableMap.<String, Object>builder()
+                  .put(PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID)
+                  .put(PlannerContext.CTX_SQL_CURRENT_TIMESTAMP, 
"2000-01-01T00:00:00Z")
+                  .put(QueryContexts.DEFAULT_TIMEOUT_KEY, 
QueryContexts.DEFAULT_TIMEOUT_MILLIS)
+                  .put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 
Long.MAX_VALUE);
+  public static final Map<String, Object> QUERY_CONTEXT_DEFAULT = 
DEFAULT_QUERY_CONTEXT_BUILDER.build();
 
   public static final Map<String, Object> 
QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS = ImmutableMap.of(
       PlannerContext.CTX_SQL_QUERY_ID, DUMMY_SQL_ID,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index fae92fb..5a16eb6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -8194,6 +8194,45 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
   }
 
   @Test
+  public void 
testFilterAndGroupByLookupUsingJoinOperatorWithValueFilterPushdown() throws 
Exception
+  {
+    // Cannot vectorize JOIN operator.
+    cannotVectorize();
+    Map<String, Object> queryRewriteValueColumnFiltersContext = 
DEFAULT_QUERY_CONTEXT_BUILDER
+        
.put(QueryContexts.JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY, true)
+        .build();
+    testQuery(
+        "SELECT lookyloo.k, COUNT(*)\n"
+        + "FROM foo LEFT JOIN lookup.lookyloo ON foo.dim2 = lookyloo.k\n"
+        + "WHERE lookyloo.v = 'xa'\n"
+        + "GROUP BY lookyloo.k",
+        queryRewriteValueColumnFiltersContext,
+        ImmutableList.of(
+            GroupByQuery.builder()
+                        .setDataSource(
+                            join(
+                                new TableDataSource(CalciteTests.DATASOURCE1),
+                                new LookupDataSource("lookyloo"),
+                                "j0.",
+                                
equalsCondition(DruidExpression.fromColumn("dim2"), 
DruidExpression.fromColumn("j0.k")),
+                                JoinType.LEFT
+                            )
+                        )
+                        .setInterval(querySegmentSpec(Filtration.eternity()))
+                        .setDimFilter(selector("j0.v", "xa", null))
+                        .setGranularity(Granularities.ALL)
+                        .setDimensions(dimensions(new 
DefaultDimensionSpec("j0.k", "d0")))
+                        .setAggregatorSpecs(aggregators(new 
CountAggregatorFactory("a0")))
+                        .setContext(queryRewriteValueColumnFiltersContext)
+                        .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"a", 2L}
+        )
+    );
+  }
+
+  @Test
   public void testFilterAndGroupByLookupUsingPostAggregationJoinOperator() 
throws Exception
   {
     testQuery(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to