This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 16d293d  Directly rewrite filters on RHS join columns into LHS 
equivalents (#9818)
16d293d is described below

commit 16d293d6e0d81963af3b3d468efd658317d91052
Author: Jonathan Wei <[email protected]>
AuthorDate: Fri May 8 23:45:35 2020 -0700

    Directly rewrite filters on RHS join columns into LHS equivalents (#9818)
    
    * Directly rewrite filters on RHS join columns into LHS equivalents
    
    * PR comments
    
    * Fix inspection
    
    * Revert unnecessary ExprMacroTable change
    
    * Fix build after merge
    
    * Address PR comments
---
 .../java/org/apache/druid/query/filter/Filter.java |  26 ++
 .../apache/druid/query/filter/LikeDimFilter.java   |  21 +
 .../apache/druid/segment/filter/BoundFilter.java   |  35 ++
 .../segment/filter/DimensionPredicateFilter.java   |  33 +-
 .../druid/segment/filter/ExpressionFilter.java     |   7 +
 .../apache/druid/segment/filter/FalseFilter.java   |  13 +
 .../org/apache/druid/segment/filter/Filters.java   |  23 +
 .../org/apache/druid/segment/filter/InFilter.java  |  27 ++
 .../apache/druid/segment/filter/LikeFilter.java    |  52 +++
 .../org/apache/druid/segment/filter/NotFilter.java |  13 +
 .../apache/druid/segment/filter/RegexFilter.java   |  56 +++
 .../druid/segment/filter/SearchQueryFilter.java    |  66 +++
 .../druid/segment/filter/SelectorFilter.java       |  27 ++
 .../apache/druid/segment/filter/TrueFilter.java    |  13 +
 .../segment/join/filter/JoinFilterAnalysis.java    |  16 +-
 .../segment/join/filter/JoinFilterAnalyzer.java    | 509 +++++++++++++++------
 .../segment/join/filter/JoinFilterPreAnalysis.java |  28 +-
 .../druid/segment/join/filter/JoinFilterSplit.java |   8 +-
 .../druid/query/filter/LikeDimFilterTest.java      |  10 +
 .../druid/segment/filter/BoundFilterTest.java      |  29 ++
 .../druid/segment/filter/ExpressionFilterTest.java |  17 +
 .../segment/filter/FilterCnfConversionTest.java    |  11 +
 .../apache/druid/segment/filter/InFilterTest.java  |  25 +
 .../druid/segment/filter/JavaScriptFilterTest.java |  18 +
 .../druid/segment/filter/LikeFilterTest.java       |  35 ++
 .../apache/druid/segment/filter/NotFilterTest.java |  20 +
 .../druid/segment/filter/RegexFilterTest.java      |  37 ++
 .../segment/filter/SearchQueryFilterTest.java      |  37 ++
 .../druid/segment/join/JoinFilterAnalyzerTest.java | 389 +++++++++++++---
 .../apache/druid/segment/join/JoinTestHelper.java  |   1 +
 .../src/test/resources/wikipedia/regions.json      |   3 +-
 31 files changed, 1387 insertions(+), 218 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/query/filter/Filter.java 
b/processing/src/main/java/org/apache/druid/query/filter/Filter.java
index 27a5047..4f2f4de 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/Filter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/Filter.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.ColumnSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
+import java.util.Map;
 import java.util.Set;
 
 public interface Filter
@@ -162,4 +163,29 @@ public interface Filter
    * can be expected to have a bitmap index retrievable via {@link 
BitmapIndexSelector#getBitmapIndex(String)}
    */
   Set<String> getRequiredColumns();
+
+  /**
+   * Returns true is this filter is able to return a copy of this filter that 
is identical to this filter except that it
+   * operates on different columns, based on a renaming map.
+   */
+  default boolean supportsRequiredColumnRewrite()
+  {
+    return false;
+  }
+
+  /**
+   * Return a copy of this filter that is identical to the this filter except 
that it operates on different columns,
+   * based on a renaming map where the key is the column to be renamed in the 
filter, and the value is the new
+   * column name.
+   *
+   * For example, if I have a filter (A = hello), and I have a renaming map (A 
-> B),
+   * this should return the filter (B = hello)
+   *
+   * @param columnRewrites Column rewrite map
+   * @return Copy of this filter that operates on new columns based on the 
rewrite map
+   */
+  default Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    throw new UnsupportedOperationException("Required column rewrite is not 
supported by this filter.");
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/LikeDimFilter.java 
b/processing/src/main/java/org/apache/druid/query/filter/LikeDimFilter.java
index 29fac03..b99df80 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/LikeDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/LikeDimFilter.java
@@ -363,5 +363,26 @@ public class LikeDimFilter implements DimFilter
     {
       return suffixMatch;
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      LikeMatcher that = (LikeMatcher) o;
+      return getSuffixMatch() == that.getSuffixMatch() &&
+             Objects.equals(getPrefix(), that.getPrefix()) &&
+             Objects.equals(pattern.toString(), that.pattern.toString());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(getSuffixMatch(), getPrefix(), pattern.toString());
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
index 664740a..c7f0dc9 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/BoundFilter.java
@@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
 import it.unimi.dsi.fastutil.ints.IntList;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.extraction.ExtractionFn;
@@ -47,6 +48,7 @@ import org.apache.druid.segment.column.BitmapIndex;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import java.util.Comparator;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -172,6 +174,39 @@ public class BoundFilter implements Filter
     return boundDimFilter.getRequiredColumns();
   }
 
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = 
columnRewrites.get(boundDimFilter.getDimension());
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          boundDimFilter.getDimension()
+      );
+    }
+    BoundDimFilter newDimFilter = new BoundDimFilter(
+        rewriteDimensionTo,
+        boundDimFilter.getLower(),
+        boundDimFilter.getUpper(),
+        boundDimFilter.isLowerStrict(),
+        boundDimFilter.isUpperStrict(),
+        null,
+        boundDimFilter.getExtractionFn(),
+        boundDimFilter.getOrdering()
+    );
+    return new BoundFilter(
+        newDimFilter
+    );
+  }
+
   private static Pair<Integer, Integer> getStartEndIndexes(
       final BoundDimFilter boundDimFilter,
       final BitmapIndex bitmapIndex
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
 
b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
index 3bc3918..f18c01d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/DimensionPredicateFilter.java
@@ -40,17 +40,18 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
+import java.util.Objects;
 import java.util.Set;
 
 /**
  */
 public class DimensionPredicateFilter implements Filter
 {
-  private final String dimension;
-  private final DruidPredicateFactory predicateFactory;
-  private final String basePredicateString;
-  private final ExtractionFn extractionFn;
-  private final FilterTuning filterTuning;
+  protected final String dimension;
+  protected final DruidPredicateFactory predicateFactory;
+  protected final String basePredicateString;
+  protected final ExtractionFn extractionFn;
+  protected final FilterTuning filterTuning;
 
   public DimensionPredicateFilter(
       final String dimension,
@@ -218,4 +219,26 @@ public class DimensionPredicateFilter implements Filter
       return StringUtils.format("%s = %s", dimension, basePredicateString);
     }
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DimensionPredicateFilter that = (DimensionPredicateFilter) o;
+    return Objects.equals(dimension, that.dimension) &&
+           Objects.equals(basePredicateString, that.basePredicateString) &&
+           Objects.equals(extractionFn, that.extractionFn) &&
+           Objects.equals(filterTuning, that.filterTuning);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dimension, basePredicateString, extractionFn, 
filterTuning);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
 
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
index 8283fd7..52238cb 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java
@@ -176,4 +176,11 @@ public class ExpressionFilter implements Filter
   {
     return requiredBindings.get();
   }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    // We could support this, but need a good approach to rewriting the 
identifiers within an expression.
+    return false;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/FalseFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/FalseFilter.java
index 902dae4..b7bd2b2 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/FalseFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/FalseFilter.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 public class FalseFilter implements Filter
@@ -100,6 +101,18 @@ public class FalseFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    return this;
+  }
+
+  @Override
   public String toString()
   {
     return "false";
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/Filters.java 
b/processing/src/main/java/org/apache/druid/segment/filter/Filters.java
index 72e343c..a1f536d 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/Filters.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/Filters.java
@@ -498,4 +498,27 @@ public class Filters
 
     return new AndFilter(filterList);
   }
+
+  /**
+   * Create a filter representing an OR relationship across a set of filters.
+   *
+   * @param filterSet Set of filters
+   *
+   * @return If filterSet has more than one element, return an OR filter 
composed of the filters from filterSet
+   * If filterSet has a single element, return that element alone
+   * If filterSet is empty, return null
+   */
+  @Nullable
+  public static Filter or(Set<Filter> filterSet)
+  {
+    if (filterSet.isEmpty()) {
+      return null;
+    }
+
+    if (filterSet.size() == 1) {
+      return filterSet.iterator().next();
+    }
+
+    return new OrFilter(filterSet);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java
index d609a54..a230520 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/InFilter.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
 import it.unimi.dsi.fastutil.ints.IntIterable;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.BitmapIndexSelector;
@@ -45,6 +46,7 @@ import org.apache.druid.segment.column.BitmapIndex;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -185,6 +187,31 @@ public class InFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(dimension);
+    if (rewriteDimensionTo == null) {
+      throw new IAE("Received a non-applicable rewrite: %s, filter's 
dimension: %s", columnRewrites, dimension);
+    }
+
+    return new InFilter(
+        rewriteDimensionTo,
+        values,
+        longPredicateSupplier,
+        floatPredicateSupplier,
+        doublePredicateSupplier,
+        extractionFn,
+        filterTuning
+    );
+  }
+
+  @Override
   public boolean supportsBitmapIndex(BitmapIndexSelector selector)
   {
     return selector.getBitmapIndex(dimension) != null;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
index aa7f952..72332c5 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/LikeFilter.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.IntIterable;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.BitmapIndexSelector;
@@ -44,7 +45,9 @@ import 
org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 
 public class LikeFilter implements Filter
@@ -108,6 +111,33 @@ public class LikeFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(dimension);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          dimension
+      );
+    }
+
+    return new LikeFilter(
+        rewriteDimensionTo,
+        extractionFn,
+        likeMatcher,
+        filterTuning
+    );
+  }
+
+  @Override
   public boolean supportsBitmapIndex(BitmapIndexSelector selector)
   {
     return selector.getBitmapIndex(dimension) != null;
@@ -253,4 +283,26 @@ public class LikeFilter implements Filter
       }
     };
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LikeFilter that = (LikeFilter) o;
+    return Objects.equals(dimension, that.dimension) &&
+           Objects.equals(extractionFn, that.extractionFn) &&
+           Objects.equals(likeMatcher, that.likeMatcher) &&
+           Objects.equals(filterTuning, that.filterTuning);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dimension, extractionFn, likeMatcher, filterTuning);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/NotFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/NotFilter.java
index a82a57f..d4f7b6c 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/NotFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/NotFilter.java
@@ -33,6 +33,7 @@ import org.apache.druid.segment.ColumnSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -112,6 +113,18 @@ public class NotFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return baseFilter.supportsRequiredColumnRewrite();
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    return new NotFilter(baseFilter.rewriteRequiredColumns(columnRewrites));
+  }
+
+  @Override
   public boolean supportsBitmapIndex(BitmapIndexSelector selector)
   {
     return baseFilter.supportsBitmapIndex(selector);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/RegexFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/RegexFilter.java
index c71841a..d11bdb3 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/RegexFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/RegexFilter.java
@@ -20,19 +20,25 @@
 package org.apache.druid.segment.filter;
 
 import com.google.common.base.Predicate;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.DruidDoublePredicate;
 import org.apache.druid.query.filter.DruidFloatPredicate;
 import org.apache.druid.query.filter.DruidLongPredicate;
 import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.FilterTuning;
 
+import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 
 /**
  */
 public class RegexFilter extends DimensionPredicateFilter
 {
+  private final Pattern pattern;
+
   public RegexFilter(
       final String dimension,
       final Pattern pattern,
@@ -79,5 +85,55 @@ public class RegexFilter extends DimensionPredicateFilter
         extractionFn,
         filterTuning
     );
+    this.pattern = pattern;
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(dimension);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          dimension
+      );
+    }
+
+    return new RegexFilter(
+        rewriteDimensionTo,
+        pattern,
+        extractionFn,
+        filterTuning
+    );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    RegexFilter that = (RegexFilter) o;
+    return Objects.equals(pattern.toString(), that.pattern.toString());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), pattern.toString());
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/SearchQueryFilter.java
 
b/processing/src/main/java/org/apache/druid/segment/filter/SearchQueryFilter.java
index 81424fe..741e5b3 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/SearchQueryFilter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/SearchQueryFilter.java
@@ -22,18 +22,25 @@ package org.apache.druid.segment.filter;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Predicate;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.DruidDoublePredicate;
 import org.apache.druid.query.filter.DruidFloatPredicate;
 import org.apache.druid.query.filter.DruidLongPredicate;
 import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.FilterTuning;
 import org.apache.druid.query.search.SearchQuerySpec;
 
+import java.util.Map;
+import java.util.Objects;
+
 /**
  */
 public class SearchQueryFilter extends DimensionPredicateFilter
 {
+  private final SearchQuerySpec query;
+
   @JsonCreator
   public SearchQueryFilter(
       @JsonProperty("dimension") final String dimension,
@@ -69,9 +76,68 @@ public class SearchQueryFilter extends 
DimensionPredicateFilter
           {
             return input -> query.accept(String.valueOf(input));
           }
+
+          @Override
+          public String toString()
+          {
+            return "SearchFilter{" +
+                   "query='" + query + '\'' +
+                   '}';
+          }
         },
         extractionFn,
         filterTuning
     );
+
+    this.query = query;
+  }
+
+  @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(dimension);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          dimension
+      );
+    }
+
+    return new SearchQueryFilter(
+        rewriteDimensionTo,
+        query,
+        extractionFn,
+        filterTuning
+    );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    SearchQueryFilter that = (SearchQueryFilter) o;
+    return Objects.equals(query, that.query);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), query);
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
index f56bfac..fbe8034 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/filter/SelectorFilter.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.filter;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.filter.BitmapIndexSelector;
@@ -34,6 +35,7 @@ import org.apache.druid.segment.DimensionHandlerUtils;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -129,6 +131,31 @@ public class SelectorFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    String rewriteDimensionTo = columnRewrites.get(dimension);
+
+    if (rewriteDimensionTo == null) {
+      throw new IAE(
+          "Received a non-applicable rewrite: %s, filter's dimension: %s",
+          columnRewrites,
+          dimension
+      );
+    }
+
+    return new SelectorFilter(
+        rewriteDimensionTo,
+        value
+    );
+  }
+
+  @Override
   public String toString()
   {
     return StringUtils.format("%s = %s", dimension, value);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/filter/TrueFilter.java 
b/processing/src/main/java/org/apache/druid/segment/filter/TrueFilter.java
index bb35b4f..f760418 100644
--- a/processing/src/main/java/org/apache/druid/segment/filter/TrueFilter.java
+++ b/processing/src/main/java/org/apache/druid/segment/filter/TrueFilter.java
@@ -30,6 +30,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -96,6 +97,18 @@ public class TrueFilter implements Filter
   }
 
   @Override
+  public boolean supportsRequiredColumnRewrite()
+  {
+    return true;
+  }
+
+  @Override
+  public Filter rewriteRequiredColumns(Map<String, String> columnRewrites)
+  {
+    return this;
+  }
+
+  @Override
   public double estimateSelectivity(BitmapIndexSelector indexSelector)
   {
     return 1;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java
 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java
index 4f8166a..ef6d869 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalysis.java
@@ -19,12 +19,9 @@
 
 package org.apache.druid.segment.join.filter;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.filter.Filter;
-import org.apache.druid.segment.VirtualColumn;
 
 import javax.annotation.Nullable;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -40,19 +37,16 @@ public class JoinFilterAnalysis
   private final boolean retainAfterJoin;
   private final Filter originalFilter;
   private final Optional<Filter> pushDownFilter;
-  private final List<VirtualColumn> pushDownVirtualColumns;
 
   public JoinFilterAnalysis(
       boolean retainAfterJoin,
       Filter originalFilter,
-      @Nullable Filter pushDownFilter,
-      List<VirtualColumn> pushDownVirtualColumns
+      @Nullable Filter pushDownFilter
   )
   {
     this.retainAfterJoin = retainAfterJoin;
     this.originalFilter = originalFilter;
     this.pushDownFilter = pushDownFilter == null ? Optional.empty() : 
Optional.of(pushDownFilter);
-    this.pushDownVirtualColumns = pushDownVirtualColumns;
   }
 
   public boolean isCanPushDown()
@@ -75,11 +69,6 @@ public class JoinFilterAnalysis
     return pushDownFilter;
   }
 
-  public List<VirtualColumn> getPushDownVirtualColumns()
-  {
-    return pushDownVirtualColumns;
-  }
-
   /**
    * Utility method for generating an analysis that represents: "Filter cannot 
be pushed down"
    *
@@ -92,8 +81,7 @@ public class JoinFilterAnalysis
     return new JoinFilterAnalysis(
         true,
         originalFilter,
-        null,
-        ImmutableList.of()
+        null
     );
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
index 7b2032f..77f4c9d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterAnalyzer.java
@@ -19,7 +19,8 @@
 
 package org.apache.druid.segment.join.filter;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.query.filter.Filter;
@@ -44,6 +45,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -60,7 +63,7 @@ import java.util.Set;
  * A filter clause can be pushed down if it meets one of the following 
conditions:
  * - The filter only applies to columns from the base table
  * - The filter applies to columns from the join table, and we determine that 
the filter can be rewritten
- *   into a filter on columns from the base table
+ * into a filter on columns from the base table
  * 
  * For the second case, where we rewrite filter clauses, the rewritten clause 
can be less selective than the original,
  * so we preserve the original clause in the post-join filtering phase.
@@ -86,7 +89,7 @@ public class JoinFilterAnalyzer
    * where we convert the query filter (if any) into conjunctive normal form 
and then
    * determine the structure of RHS filter rewrites (if any), since this 
information is shared across all
    * per-segment operations.
-   *
+   * 
    * See {@link JoinFilterPreAnalysis} for details on the result of this 
pre-analysis step.
    *
    * @param joinableClauses                 The joinable clauses from the query
@@ -124,8 +127,10 @@ public class JoinFilterAnalyzer
           null,
           null,
           null,
+          null,
           enableFilterPushDown,
-          enableFilterRewrite
+          enableFilterRewrite,
+          Collections.emptyMap()
       );
     }
 
@@ -165,8 +170,10 @@ public class JoinFilterAnalyzer
           normalizedBaseTableClauses,
           normalizedJoinTableClauses,
           null,
+          null,
           enableFilterPushDown,
-          enableFilterRewrite
+          enableFilterRewrite,
+          Collections.emptyMap()
       );
     }
 
@@ -182,64 +189,79 @@ public class JoinFilterAnalyzer
       }
     }
 
-    Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> 
correlationsByPrefix = new HashMap<>();
-
     // Determine candidates for filter rewrites.
     // A candidate is an RHS column that appears in a filter, along with the 
value being filtered on, plus
     // the joinable clause associated with the table that the RHS column is 
from.
-    Set<RhsRewriteCandidate> rhsRewriteCandidates = new HashSet<>();
+    Set<RhsRewriteCandidate> rhsRewriteCandidates = new LinkedHashSet<>();
     for (Filter orClause : normalizedJoinTableClauses) {
       if (filterMatchesNull(orClause)) {
         continue;
       }
 
-      if (orClause instanceof SelectorFilter) {
-        // this is a candidate for RHS filter rewrite, determine column 
correlations and correlated values
-        String reqColumn = ((SelectorFilter) orClause).getDimension();
-        String reqValue = ((SelectorFilter) orClause).getValue();
-        JoinableClause joinableClause = isColumnFromJoin(joinableClauses, 
reqColumn);
-        if (joinableClause != null) {
-          rhsRewriteCandidates.add(
-              new RhsRewriteCandidate(
-                  joinableClause,
-                  reqColumn,
-                  reqValue
-              )
-          );
-        }
-      }
-
       if (orClause instanceof OrFilter) {
         for (Filter subFilter : ((OrFilter) orClause).getFilters()) {
-          if (subFilter instanceof SelectorFilter) {
-            String reqColumn = ((SelectorFilter) subFilter).getDimension();
-            String reqValue = ((SelectorFilter) subFilter).getValue();
-            JoinableClause joinableClause = isColumnFromJoin(joinableClauses, 
reqColumn);
-            if (joinableClause != null) {
-              rhsRewriteCandidates.add(
-                  new RhsRewriteCandidate(
-                      joinableClause,
-                      reqColumn,
-                      reqValue
-                  )
-              );
-            }
+          Optional<RhsRewriteCandidate> rhsRewriteCandidate = 
determineRhsRewriteCandidatesForSingleFilter(
+              subFilter,
+              equiconditions,
+              joinableClauses
+          );
+
+          if (rhsRewriteCandidate.isPresent()) {
+            rhsRewriteCandidates.add(rhsRewriteCandidate.get());
           }
         }
+        continue;
+      }
+
+      Optional<RhsRewriteCandidate> rhsRewriteCandidate = 
determineRhsRewriteCandidatesForSingleFilter(
+          orClause,
+          equiconditions,
+          joinableClauses
+      );
+
+      if (rhsRewriteCandidate.isPresent()) {
+        rhsRewriteCandidates.add(rhsRewriteCandidate.get());
       }
     }
 
     // Build a map of RHS table prefix -> JoinFilterColumnCorrelationAnalysis 
based on the RHS rewrite candidates
+    Map<String, Optional<Map<String, JoinFilterColumnCorrelationAnalysis>>> 
correlationsByPrefix = new HashMap<>();
+    Map<String, Optional<JoinFilterColumnCorrelationAnalysis>> 
directRewriteCorrelations = new HashMap<>();
+
     for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
-      correlationsByPrefix.computeIfAbsent(
-          rhsRewriteCandidate.getJoinableClause().getPrefix(),
-          p -> findCorrelatedBaseTableColumns(
-              joinableClauses,
-              p,
-              rhsRewriteCandidate.getJoinableClause(),
-              equiconditions
-          )
-      );
+      if (rhsRewriteCandidate.isDirectRewrite()) {
+        directRewriteCorrelations.computeIfAbsent(
+            rhsRewriteCandidate.getRhsColumn(),
+            c -> {
+              Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> 
correlatedBaseTableColumns =
+                  findCorrelatedBaseTableColumns(
+                      joinableClauses,
+                      c,
+                      rhsRewriteCandidate,
+                      equiconditions
+                  );
+              if (!correlatedBaseTableColumns.isPresent()) {
+                return Optional.empty();
+              } else {
+                JoinFilterColumnCorrelationAnalysis baseColumnAnalysis = 
correlatedBaseTableColumns.get().get(c);
+                // for direct rewrites, there will only be one analysis keyed 
by the RHS column
+                assert (baseColumnAnalysis != null);
+                return Optional.of(correlatedBaseTableColumns.get().get(c));
+              }
+
+            }
+        );
+      } else {
+        correlationsByPrefix.computeIfAbsent(
+            rhsRewriteCandidate.getJoinableClause().getPrefix(),
+            p -> findCorrelatedBaseTableColumns(
+                joinableClauses,
+                p,
+                rhsRewriteCandidate,
+                equiconditions
+            )
+        );
+      }
     }
 
     // Using the RHS table prefix -> JoinFilterColumnCorrelationAnalysis 
created in the previous step,
@@ -248,23 +270,40 @@ public class JoinFilterAnalyzer
     // JoinFilterColumnCorrelationAnalysis objects, which are shared across 
all rhsFilterColumn entries that belong
     // to the same RHS table.
     //
-    // The value is a List<JoinFilterColumnCorreationAnalysis> instead of a 
single value because a table can be joined
+    // The value is a List<JoinFilterColumnCorrelationAnalysis> instead of a 
single value because a table can be joined
     // to another via multiple columns.
     // (See 
JoinFilterAnalyzerTest.test_filterPushDown_factToRegionOneColumnToTwoRHSColumnsAndFilterOnRHS
 for an example)
-    Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> 
correlationsByFilteringColumn = new HashMap<>();
+    Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByFilteringColumn = new LinkedHashMap<>();
+    Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByDirectFilteringColumn = new LinkedHashMap<>();
     for (RhsRewriteCandidate rhsRewriteCandidate : rhsRewriteCandidates) {
+      if (rhsRewriteCandidate.isDirectRewrite()) {
+        List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
+            correlationsByDirectFilteringColumn.computeIfAbsent(
+                rhsRewriteCandidate.getRhsColumn(),
+                (rhsCol) -> {
+                  return new ArrayList<>();
+                }
+            );
+        perColumnCorrelations.add(
+            
directRewriteCorrelations.get(rhsRewriteCandidate.getRhsColumn()).get()
+        );
+        continue;
+      }
+
       Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> 
correlationsForPrefix = correlationsByPrefix.get(
           rhsRewriteCandidate.getJoinableClause().getPrefix()
       );
       if (correlationsForPrefix.isPresent()) {
         for (Map.Entry<String, JoinFilterColumnCorrelationAnalysis> 
correlationForPrefix : correlationsForPrefix.get()
                                                                                
                                 .entrySet()) {
-          Optional<List<JoinFilterColumnCorrelationAnalysis>> 
perColumnCorrelations =
+          List<JoinFilterColumnCorrelationAnalysis> perColumnCorrelations =
               correlationsByFilteringColumn.computeIfAbsent(
                   rhsRewriteCandidate.getRhsColumn(),
-                  (rhsCol) -> Optional.of(new ArrayList<>())
+                  (rhsCol) -> {
+                    return new ArrayList<>();
+                  }
               );
-          perColumnCorrelations.get().add(correlationForPrefix.getValue());
+          perColumnCorrelations.add(correlationForPrefix.getValue());
           
correlationForPrefix.getValue().getCorrelatedValuesMap().computeIfAbsent(
               Pair.of(rhsRewriteCandidate.getRhsColumn(), 
rhsRewriteCandidate.getValueForRewrite()),
               (rhsVal) -> {
@@ -286,19 +325,30 @@ public class JoinFilterAnalyzer
           );
         }
       } else {
-        correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), 
Optional.empty());
+        correlationsByFilteringColumn.put(rhsRewriteCandidate.getRhsColumn(), 
null);
       }
     }
 
     // Go through each per-column analysis list and prune duplicates
-    for (Map.Entry<String, 
Optional<List<JoinFilterColumnCorrelationAnalysis>>> correlation : 
correlationsByFilteringColumn.entrySet()) {
-      if (correlation.getValue().isPresent()) {
+    for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlation : correlationsByFilteringColumn
+        .entrySet()) {
+      if (correlation.getValue() != null) {
         List<JoinFilterColumnCorrelationAnalysis> dedupList = 
eliminateCorrelationDuplicates(
-            correlation.getValue().get()
+            correlation.getValue()
         );
-        correlationsByFilteringColumn.put(correlation.getKey(), 
Optional.of(dedupList));
+        correlationsByFilteringColumn.put(correlation.getKey(), dedupList);
       }
     }
+    for (Map.Entry<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlation : correlationsByDirectFilteringColumn
+        .entrySet()) {
+      if (correlation.getValue() != null) {
+        List<JoinFilterColumnCorrelationAnalysis> dedupList = 
eliminateCorrelationDuplicates(
+            correlation.getValue()
+        );
+        correlationsByDirectFilteringColumn.put(correlation.getKey(), 
dedupList);
+      }
+    }
+
 
     return new JoinFilterPreAnalysis(
         joinableClauses,
@@ -307,13 +357,73 @@ public class JoinFilterAnalyzer
         normalizedBaseTableClauses,
         normalizedJoinTableClauses,
         correlationsByFilteringColumn,
+        correlationsByDirectFilteringColumn,
         enableFilterPushDown,
-        enableFilterRewrite
+        enableFilterRewrite,
+        equiconditions
     );
   }
 
+  private static Optional<RhsRewriteCandidate> 
determineRhsRewriteCandidatesForSingleFilter(
+      Filter orClause,
+      Map<String, Set<Expr>> equiconditions,
+      List<JoinableClause> joinableClauses
+  )
+  {
+    // Check if the filter clause is on the RHS join column. If so, we can 
rewrite the clause to filter on the
+    // LHS join column instead.
+    // Currently, we only support rewrites of filters that operate on a single 
column for simplicity.
+    Set<String> requiredColumns = orClause.getRequiredColumns();
+    if (orClause.supportsRequiredColumnRewrite() &&
+        doesRequiredColumnSetSupportDirectJoinFilterRewrite(requiredColumns, 
equiconditions)) {
+      String reqColumn = requiredColumns.iterator().next();
+      JoinableClause joinableClause = isColumnFromJoin(joinableClauses, 
reqColumn);
+
+      return Optional.of(
+          new RhsRewriteCandidate(
+              joinableClause,
+              reqColumn,
+              null,
+              true
+          )
+      );
+    } else if (orClause instanceof SelectorFilter) {
+      // this is a candidate for RHS filter rewrite, determine column 
correlations and correlated values
+      String reqColumn = ((SelectorFilter) orClause).getDimension();
+      String reqValue = ((SelectorFilter) orClause).getValue();
+      JoinableClause joinableClause = isColumnFromJoin(joinableClauses, 
reqColumn);
+      if (joinableClause != null) {
+        return Optional.of(
+            new RhsRewriteCandidate(
+                joinableClause,
+                reqColumn,
+                reqValue,
+                false
+            )
+        );
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  private static boolean doesRequiredColumnSetSupportDirectJoinFilterRewrite(
+      Set<String> requiredColumns,
+      Map<String, Set<Expr>> equiconditions
+  )
+  {
+    if (requiredColumns.size() == 1) {
+      String reqColumn = requiredColumns.iterator().next();
+      if (equiconditions.containsKey(reqColumn)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * @param joinFilterPreAnalysis The pre-analysis computed by {@link 
#computeJoinFilterPreAnalysis)}
+   *
    * @return A JoinFilterSplit indicating what parts of the filter should be 
applied pre-join and post-join
    */
   public static JoinFilterSplit splitFilter(
@@ -324,14 +434,14 @@ public class JoinFilterAnalyzer
       return new JoinFilterSplit(
           null,
           joinFilterPreAnalysis.getOriginalFilter(),
-          ImmutableList.of()
+          ImmutableSet.of()
       );
     }
 
     // Pushdown filters, rewriting if necessary
     List<Filter> leftFilters = new ArrayList<>();
     List<Filter> rightFilters = new ArrayList<>();
-    List<VirtualColumn> pushDownVirtualColumns = new ArrayList<>();
+    Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs = new 
HashMap<>();
 
     for (Filter baseTableFilter : 
joinFilterPreAnalysis.getNormalizedBaseTableClauses()) {
       if (!filterMatchesNull(baseTableFilter)) {
@@ -344,14 +454,12 @@ public class JoinFilterAnalyzer
     for (Filter orClause : 
joinFilterPreAnalysis.getNormalizedJoinTableClauses()) {
       JoinFilterAnalysis joinFilterAnalysis = analyzeJoinFilterClause(
           orClause,
-          joinFilterPreAnalysis
+          joinFilterPreAnalysis,
+          pushDownVirtualColumnsForLhsExprs
       );
       if (joinFilterAnalysis.isCanPushDown()) {
         //noinspection OptionalGetWithoutIsPresent isCanPushDown checks 
isPresent
         leftFilters.add(joinFilterAnalysis.getPushDownFilter().get());
-        if (!joinFilterAnalysis.getPushDownVirtualColumns().isEmpty()) {
-          
pushDownVirtualColumns.addAll(joinFilterAnalysis.getPushDownVirtualColumns());
-        }
       }
       if (joinFilterAnalysis.isRetainAfterJoin()) {
         rightFilters.add(joinFilterAnalysis.getOriginalFilter());
@@ -361,7 +469,7 @@ public class JoinFilterAnalyzer
     return new JoinFilterSplit(
         Filters.and(leftFilters),
         Filters.and(rightFilters),
-        pushDownVirtualColumns
+        new HashSet<>(pushDownVirtualColumnsForLhsExprs.values())
     );
   }
 
@@ -370,14 +478,23 @@ public class JoinFilterAnalyzer
    * Analyze a filter clause from a filter that is in conjunctive normal form 
(AND of ORs).
    * The clause is expected to be an OR filter or a leaf filter.
    *
-   * @param filterClause     Individual filter clause (an OR filter or a leaf 
filter) from a filter that is in CNF
-   * @param joinFilterPreAnalysis The pre-analysis computed by {@link 
#computeJoinFilterPreAnalysis)}
+   * @param filterClause                       Individual filter clause (an OR 
filter or a leaf filter) from a filter that is in CNF
+   * @param joinFilterPreAnalysis              The pre-analysis computed by 
{@link #computeJoinFilterPreAnalysis)}
+   * @param pushDownVirtualColumnsForLhsExprs  Used when there are LHS 
expressions in the join equiconditions.
+   *                                           If we rewrite an RHS filter 
such that it applies to the LHS expression instead,
+   *                                           because the expression existed 
only in the equicondition, we must create a virtual column
+   *                                           on the LHS with the same 
expression in order to apply the filter.
+   *                                           The specific rewriting methods 
such as {@link #rewriteSelectorFilter} will use this
+   *                                           as a cache for virtual columns 
that they need to created, keyed by the expression, so that
+   *                                           they can avoid creating 
redundant virtual columns.
+   *
    *
    * @return a JoinFilterAnalysis that contains a possible filter rewrite and 
information on how to handle the filter.
    */
   private static JoinFilterAnalysis analyzeJoinFilterClause(
       Filter filterClause,
-      JoinFilterPreAnalysis joinFilterPreAnalysis
+      JoinFilterPreAnalysis joinFilterPreAnalysis,
+      Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
   )
   {
     // NULL matching conditions are not currently pushed down.
@@ -387,87 +504,178 @@ public class JoinFilterAnalyzer
       return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
     }
 
+    if (filterClause instanceof OrFilter) {
+      return rewriteOrFilter(
+          (OrFilter) filterClause,
+          joinFilterPreAnalysis,
+          pushDownVirtualColumnsForLhsExprs
+      );
+    }
+
+    if (filterClause.supportsRequiredColumnRewrite() && 
doesRequiredColumnSetSupportDirectJoinFilterRewrite(
+        filterClause.getRequiredColumns(),
+        joinFilterPreAnalysis.getEquiconditions()
+    )) {
+      return rewriteFilterDirect(
+          filterClause,
+          joinFilterPreAnalysis,
+          pushDownVirtualColumnsForLhsExprs
+      );
+    }
+
     // Currently we only support rewrites of selector filters and selector 
filters within OR filters.
     if (filterClause instanceof SelectorFilter) {
       return rewriteSelectorFilter(
           (SelectorFilter) filterClause,
-          joinFilterPreAnalysis
+          joinFilterPreAnalysis,
+          pushDownVirtualColumnsForLhsExprs
       );
     }
 
-    if (filterClause instanceof OrFilter) {
-      return rewriteOrFilter(
-          (OrFilter) filterClause,
-          joinFilterPreAnalysis
-      );
+    return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
+  }
+
+  private static JoinFilterAnalysis rewriteFilterDirect(
+      Filter filterClause,
+      JoinFilterPreAnalysis joinFilterPreAnalysis,
+      Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
+  )
+  {
+    if (!filterClause.supportsRequiredColumnRewrite()) {
+      return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
     }
 
-    return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
+    List<Filter> newFilters = new ArrayList<>();
+
+    // we only support direct rewrites of filters that reference a single 
column
+    String reqColumn = filterClause.getRequiredColumns().iterator().next();
+
+    List<JoinFilterColumnCorrelationAnalysis> correlationAnalyses = 
joinFilterPreAnalysis.getCorrelationsByDirectFilteringColumn()
+                                                                               
          .get(reqColumn);
+
+    if (correlationAnalyses == null) {
+      return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
+    }
+
+    for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : 
correlationAnalyses) {
+      if (correlationAnalysis.supportsPushDown()) {
+        for (String correlatedBaseColumn : 
correlationAnalysis.getBaseColumns()) {
+          Filter rewrittenFilter = 
filterClause.rewriteRequiredColumns(ImmutableMap.of(
+              reqColumn,
+              correlatedBaseColumn
+          ));
+          newFilters.add(rewrittenFilter);
+        }
+
+        for (Expr correlatedBaseExpr : 
correlationAnalysis.getBaseExpressions()) {
+          // We need to create a virtual column for the expressions when 
pushing down
+          VirtualColumn pushDownVirtualColumn = 
pushDownVirtualColumnsForLhsExprs.computeIfAbsent(
+              correlatedBaseExpr,
+              (expr) -> {
+                String vcName = 
getCorrelatedBaseExprVirtualColumnName(pushDownVirtualColumnsForLhsExprs.size());
+                return new ExpressionVirtualColumn(
+                    vcName,
+                    correlatedBaseExpr,
+                    ValueType.STRING
+                );
+              }
+          );
+
+          Filter rewrittenFilter = 
filterClause.rewriteRequiredColumns(ImmutableMap.of(
+              reqColumn,
+              pushDownVirtualColumn.getOutputName()
+          ));
+          newFilters.add(rewrittenFilter);
+        }
+      }
+    }
+
+    if (newFilters.isEmpty()) {
+      return JoinFilterAnalysis.createNoPushdownFilterAnalysis(filterClause);
+    }
+
+    return new JoinFilterAnalysis(
+        false,
+        filterClause,
+        Filters.and(newFilters)
+    );
   }
 
   /**
    * Potentially rewrite the subfilters of an OR filter so that the whole OR 
filter can be pushed down to
    * the base table.
    *
-   * @param orFilter         OrFilter to be rewritten
+   * @param orFilter              OrFilter to be rewritten
    * @param joinFilterPreAnalysis The pre-analysis computed by {@link 
#computeJoinFilterPreAnalysis)}
-   *
+   * @param pushDownVirtualColumnsForLhsExprs See comments on {@link 
#analyzeJoinFilterClause}
    * @return A JoinFilterAnalysis indicating how to handle the potentially 
rewritten filter
    */
   private static JoinFilterAnalysis rewriteOrFilter(
       OrFilter orFilter,
-      JoinFilterPreAnalysis joinFilterPreAnalysis
+      JoinFilterPreAnalysis joinFilterPreAnalysis,
+      Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
   )
   {
-    boolean retainRhs = false;
     Set<Filter> newFilters = new HashSet<>();
+    boolean retainRhs = false;
+
     for (Filter filter : orFilter.getFilters()) {
       if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), 
filter.getRequiredColumns())) {
         newFilters.add(filter);
         continue;
       }
 
-      retainRhs = true;
-      if (filter instanceof SelectorFilter) {
-        JoinFilterAnalysis rewritten = rewriteSelectorFilter(
+      JoinFilterAnalysis rewritten = null;
+      if (doesRequiredColumnSetSupportDirectJoinFilterRewrite(
+          filter.getRequiredColumns(),
+          joinFilterPreAnalysis.getEquiconditions()
+      )) {
+        rewritten = rewriteFilterDirect(
+            filter,
+            joinFilterPreAnalysis,
+            pushDownVirtualColumnsForLhsExprs
+        );
+      } else if (filter instanceof SelectorFilter) {
+        retainRhs = true;
+        // We could optimize retainRhs handling further by introducing a 
"filter to retain" property to the
+        // analysis, and only keeping the subfilters that need to be retained
+        rewritten = rewriteSelectorFilter(
             (SelectorFilter) filter,
-            joinFilterPreAnalysis
+            joinFilterPreAnalysis,
+            pushDownVirtualColumnsForLhsExprs
         );
-        if (!rewritten.isCanPushDown()) {
-          return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
-        } else {
-          //noinspection OptionalGetWithoutIsPresent isCanPushDown checks 
isPresent
-          newFilters.add(rewritten.getPushDownFilter().get());
-        }
-      } else {
+      }
+
+      if (rewritten == null || !rewritten.isCanPushDown()) {
         return JoinFilterAnalysis.createNoPushdownFilterAnalysis(orFilter);
+      } else {
+        //noinspection OptionalGetWithoutIsPresent isCanPushDown checks 
isPresent
+        newFilters.add(rewritten.getPushDownFilter().get());
       }
     }
 
     return new JoinFilterAnalysis(
         retainRhs,
         orFilter,
-        new OrFilter(newFilters),
-        ImmutableList.of()
+        Filters.or(newFilters)
     );
   }
 
   /**
    * Rewrites a selector filter on a join table into an IN filter on the base 
table.
    *
-   * @param selectorFilter   SelectorFilter to be rewritten
+   * @param selectorFilter        SelectorFilter to be rewritten
    * @param joinFilterPreAnalysis The pre-analysis computed by {@link 
#computeJoinFilterPreAnalysis)}
-   *
+   * @param pushDownVirtualColumnsForLhsExprs See comments on {@link 
#analyzeJoinFilterClause}
    * @return A JoinFilterAnalysis that indicates how to handle the potentially 
rewritten filter
    */
   private static JoinFilterAnalysis rewriteSelectorFilter(
       SelectorFilter selectorFilter,
-      JoinFilterPreAnalysis joinFilterPreAnalysis
+      JoinFilterPreAnalysis joinFilterPreAnalysis,
+      Map<Expr, VirtualColumn> pushDownVirtualColumnsForLhsExprs
   )
   {
-
     List<Filter> newFilters = new ArrayList<>();
-    List<VirtualColumn> pushdownVirtualColumns = new ArrayList<>();
 
     String filteringColumn = selectorFilter.getDimension();
     String filteringValue = selectorFilter.getValue();
@@ -481,22 +689,20 @@ public class JoinFilterAnalyzer
 
     if (!areSomeColumnsFromJoin(joinFilterPreAnalysis.getJoinableClauses(), 
selectorFilter.getRequiredColumns())) {
       return new JoinFilterAnalysis(
-          true,
-          selectorFilter,
+          false,
           selectorFilter,
-          pushdownVirtualColumns
+          selectorFilter
       );
     }
 
-    Optional<List<JoinFilterColumnCorrelationAnalysis>> correlationAnalyses = 
joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
-                                                                               
                    .get(filteringColumn);
+    List<JoinFilterColumnCorrelationAnalysis> correlationAnalyses = 
joinFilterPreAnalysis.getCorrelationsByFilteringColumn()
+                                                                               
          .get(filteringColumn);
 
-    if (!correlationAnalyses.isPresent()) {
+    if (correlationAnalyses == null) {
       return JoinFilterAnalysis.createNoPushdownFilterAnalysis(selectorFilter);
     }
 
-
-    for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : 
correlationAnalyses.get()) {
+    for (JoinFilterColumnCorrelationAnalysis correlationAnalysis : 
correlationAnalyses) {
       if (correlationAnalysis.supportsPushDown()) {
         Optional<Set<String>> correlatedValues = 
correlationAnalysis.getCorrelatedValuesMap().get(
             Pair.of(filteringColumn, filteringValue)
@@ -518,17 +724,20 @@ public class JoinFilterAnalyzer
 
         for (Expr correlatedBaseExpr : 
correlationAnalysis.getBaseExpressions()) {
           // We need to create a virtual column for the expressions when 
pushing down
-          String vcName = 
getCorrelatedBaseExprVirtualColumnName(pushdownVirtualColumns.size());
-
-          VirtualColumn correlatedBaseExprVirtualColumn = new 
ExpressionVirtualColumn(
-              vcName,
+          VirtualColumn pushDownVirtualColumn = 
pushDownVirtualColumnsForLhsExprs.computeIfAbsent(
               correlatedBaseExpr,
-              ValueType.STRING
+              (expr) -> {
+                String vcName = 
getCorrelatedBaseExprVirtualColumnName(pushDownVirtualColumnsForLhsExprs.size());
+                return new ExpressionVirtualColumn(
+                    vcName,
+                    correlatedBaseExpr,
+                    ValueType.STRING
+                );
+              }
           );
-          pushdownVirtualColumns.add(correlatedBaseExprVirtualColumn);
 
           Filter rewrittenFilter = new InDimFilter(
-              vcName,
+              pushDownVirtualColumn.getOutputName(),
               correlatedValues.get(),
               null,
               null
@@ -545,8 +754,7 @@ public class JoinFilterAnalyzer
     return new JoinFilterAnalysis(
         true,
         selectorFilter,
-        Filters.and(newFilters),
-        pushdownVirtualColumns
+        Filters.and(newFilters)
     );
   }
 
@@ -593,29 +801,30 @@ public class JoinFilterAnalyzer
    * For each rhs column that appears in the equiconditions for a table's 
JoinableClause,
    * we try to determine what base table columns are related to the rhs column 
through the total set of equiconditions.
    * We do this by searching backwards through the chain of join 
equiconditions using the provided equicondition map.
-   *
+   * 
    * For example, suppose we have 3 tables, A,B,C, joined with the following 
conditions, where A is the base table:
-   *   A.joinColumn == B.joinColumn
-   *   B.joinColum == C.joinColumn
-   *
+   * A.joinColumn == B.joinColumn
+   * B.joinColum == C.joinColumn
+   * 
    * We would determine that C.joinColumn is correlated with A.joinColumn: we 
first see that
    * C.joinColumn is linked to B.joinColumn which in turn is linked to 
A.joinColumn
-   *
+   * 
    * Suppose we had the following join conditions instead:
-   *   f(A.joinColumn) == B.joinColumn
-   *   B.joinColum == C.joinColumn
+   * f(A.joinColumn) == B.joinColumn
+   * B.joinColum == C.joinColumn
    * In this case, the JoinFilterColumnCorrelationAnalysis for C.joinColumn 
would be linked to f(A.joinColumn).
-   *
+   * 
    * Suppose we had the following join conditions instead:
-   *   A.joinColumn == B.joinColumn
-   *   f(B.joinColum) == C.joinColumn
-   *
+   * A.joinColumn == B.joinColumn
+   * f(B.joinColum) == C.joinColumn
+   * 
    * Because we cannot reverse the function f() applied to the second table B 
in all cases,
    * we cannot relate C.joinColumn to A.joinColumn, and we would not generate 
a correlation for C.joinColumn
    *
-   * @param tablePrefix          Prefix for a join table
-   * @param clauseForTablePrefix Joinable clause for the prefix
-   * @param equiConditions       Map of equiconditions, keyed by the right 
hand columns
+   * @param joinableClauses     List of joinable clauses for the query
+   * @param tablePrefix         Prefix for a join table
+   * @param rhsRewriteCandidate RHS rewrite candidate that we find correlated 
base table columns for
+   * @param equiConditions      Map of equiconditions, keyed by the right hand 
columns
    *
    * @return A list of correlatation analyses for the equicondition RHS 
columns that reside in the table associated with
    * the tablePrefix
@@ -623,18 +832,24 @@ public class JoinFilterAnalyzer
   private static Optional<Map<String, JoinFilterColumnCorrelationAnalysis>> 
findCorrelatedBaseTableColumns(
       List<JoinableClause> joinableClauses,
       String tablePrefix,
-      JoinableClause clauseForTablePrefix,
+      RhsRewriteCandidate rhsRewriteCandidate,
       Map<String, Set<Expr>> equiConditions
   )
   {
+    JoinableClause clauseForTablePrefix = 
rhsRewriteCandidate.getJoinableClause();
     JoinConditionAnalysis jca = clauseForTablePrefix.getCondition();
 
     Set<String> rhsColumns = new HashSet<>();
-    for (Equality eq : jca.getEquiConditions()) {
-      rhsColumns.add(tablePrefix + eq.getRightColumn());
+    if (rhsRewriteCandidate.isDirectRewrite()) {
+      // If we filter on a RHS join column, we only need to consider that 
column from the RHS side
+      rhsColumns.add(rhsRewriteCandidate.getRhsColumn());
+    } else {
+      for (Equality eq : jca.getEquiConditions()) {
+        rhsColumns.add(tablePrefix + eq.getRightColumn());
+      }
     }
 
-    Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new 
HashMap<>();
+    Map<String, JoinFilterColumnCorrelationAnalysis> correlations = new 
LinkedHashMap<>();
 
     for (String rhsColumn : rhsColumns) {
       Set<String> correlatedBaseColumns = new HashSet<>();
@@ -674,9 +889,9 @@ public class JoinFilterAnalyzer
    * and/or expressions for a single RHS column and adds them to the provided 
sets as it traverses the
    * equicondition column relationships.
    *
-   * @param equiConditions Map of equiconditions, keyed by the right hand 
columns
-   * @param rhsColumn RHS column to find base table correlations for
-   * @param correlatedBaseColumns Set of correlated base column names for the 
provided RHS column. Will be modified.
+   * @param equiConditions            Map of equiconditions, keyed by the 
right hand columns
+   * @param rhsColumn                 RHS column to find base table 
correlations for
+   * @param correlatedBaseColumns     Set of correlated base column names for 
the provided RHS column. Will be modified.
    * @param correlatedBaseExpressions Set of correlated base column 
expressions for the provided RHS column. Will be
    *                                  modified.
    */
@@ -747,9 +962,15 @@ public class JoinFilterAnalyzer
       List<JoinFilterColumnCorrelationAnalysis> originalList
   )
   {
-    Map<List<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new 
HashMap<>();
+    Map<Set<String>, JoinFilterColumnCorrelationAnalysis> uniquesMap = new 
HashMap<>();
+
     for (JoinFilterColumnCorrelationAnalysis jca : originalList) {
-      uniquesMap.put(jca.getBaseColumns(), jca);
+      Set<String> mapKey = new HashSet<>(jca.getBaseColumns());
+      for (Expr expr : jca.getBaseExpressions()) {
+        mapKey.add(expr.stringify());
+      }
+
+      uniquesMap.put(mapKey, jca);
     }
 
     return new ArrayList<>(uniquesMap.values());
@@ -833,6 +1054,7 @@ public class JoinFilterAnalyzer
 
   private static class RhsRewriteCandidate
   {
+    private final boolean isDirectRewrite;
     private final JoinableClause joinableClause;
     private final String rhsColumn;
     private final String valueForRewrite;
@@ -840,12 +1062,14 @@ public class JoinFilterAnalyzer
     public RhsRewriteCandidate(
         JoinableClause joinableClause,
         String rhsColumn,
-        String valueForRewrite
+        String valueForRewrite,
+        boolean isDirectRewrite
     )
     {
       this.joinableClause = joinableClause;
       this.rhsColumn = rhsColumn;
       this.valueForRewrite = valueForRewrite;
+      this.isDirectRewrite = isDirectRewrite;
     }
 
     public JoinableClause getJoinableClause()
@@ -862,5 +1086,16 @@ public class JoinFilterAnalyzer
     {
       return valueForRewrite;
     }
+
+    /**
+     * A direct rewrite occurs when we filter on an RHS column that is also 
part of a join equicondition.
+     *
+     * For example, if we have the filter (j.x = 'hello') and the join 
condition is (y = j.x), we can directly
+     * rewrite the j.x filter to (y = 'hello').
+     */
+    public boolean isDirectRewrite()
+    {
+      return isDirectRewrite;
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java
 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java
index a2ec5b9..991147b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterPreAnalysis.java
@@ -19,13 +19,14 @@
 
 package org.apache.druid.segment.join.filter;
 
+import org.apache.druid.math.expr.Expr;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.join.JoinableClause;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
 
 /**
  * A JoinFilterPreAnalysis contains filter push down/rewrite information that 
does not have per-segment dependencies.
@@ -35,6 +36,7 @@ import java.util.Optional;
  * - A list of filter clauses from the original filter's CNF representation 
that only reference the base table
  * - A list of filter clauses from the original filter's CNF representation 
that reference RHS join tables
  * - A mapping of RHS filtering columns -> 
List<JoinFilterColumnCorrelationAnalysis>, used for filter rewrites
+ * - A second mapping of RHS filtering columns -> 
List<JoinFilterColumnCorrelationAnalysis>, used for direct filter rewrites
  * - A list of virtual columns that can only be computed post-join
  * - Control flag booleans for whether filter push down and RHS rewrites are 
enabled.
  */
@@ -44,10 +46,12 @@ public class JoinFilterPreAnalysis
   private final Filter originalFilter;
   private final List<Filter> normalizedBaseTableClauses;
   private final List<Filter> normalizedJoinTableClauses;
-  private final Map<String, 
Optional<List<JoinFilterColumnCorrelationAnalysis>>> 
correlationsByFilteringColumn;
+  private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByFilteringColumn;
+  private final Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByDirectFilteringColumn;
   private final boolean enableFilterPushDown;
   private final boolean enableFilterRewrite;
   private final List<VirtualColumn> postJoinVirtualColumns;
+  private final Map<String, Set<Expr>> equiconditions;
 
   public JoinFilterPreAnalysis(
       final List<JoinableClause> joinableClauses,
@@ -55,9 +59,11 @@ public class JoinFilterPreAnalysis
       final List<VirtualColumn> postJoinVirtualColumns,
       final List<Filter> normalizedBaseTableClauses,
       final List<Filter> normalizedJoinTableClauses,
-      final Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> 
correlationsByFilteringColumn,
+      final Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByFilteringColumn,
+      final Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
correlationsByDirectFilteringColumn,
       final boolean enableFilterPushDown,
-      final boolean enableFilterRewrite
+      final boolean enableFilterRewrite,
+      final Map<String, Set<Expr>> equiconditions
   )
   {
     this.joinableClauses = joinableClauses;
@@ -66,8 +72,10 @@ public class JoinFilterPreAnalysis
     this.normalizedBaseTableClauses = normalizedBaseTableClauses;
     this.normalizedJoinTableClauses = normalizedJoinTableClauses;
     this.correlationsByFilteringColumn = correlationsByFilteringColumn;
+    this.correlationsByDirectFilteringColumn = 
correlationsByDirectFilteringColumn;
     this.enableFilterPushDown = enableFilterPushDown;
     this.enableFilterRewrite = enableFilterRewrite;
+    this.equiconditions = equiconditions;
   }
 
   public List<JoinableClause> getJoinableClauses()
@@ -95,11 +103,16 @@ public class JoinFilterPreAnalysis
     return normalizedJoinTableClauses;
   }
 
-  public Map<String, Optional<List<JoinFilterColumnCorrelationAnalysis>>> 
getCorrelationsByFilteringColumn()
+  public Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
getCorrelationsByFilteringColumn()
   {
     return correlationsByFilteringColumn;
   }
 
+  public Map<String, List<JoinFilterColumnCorrelationAnalysis>> 
getCorrelationsByDirectFilteringColumn()
+  {
+    return correlationsByDirectFilteringColumn;
+  }
+
   public boolean isEnableFilterPushDown()
   {
     return enableFilterPushDown;
@@ -109,5 +122,10 @@ public class JoinFilterPreAnalysis
   {
     return enableFilterRewrite;
   }
+
+  public Map<String, Set<Expr>> getEquiconditions()
+  {
+    return equiconditions;
+  }
 }
 
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java
 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java
index 10235ca..60abcd6 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/filter/JoinFilterSplit.java
@@ -23,9 +23,9 @@ import org.apache.druid.query.filter.Filter;
 import org.apache.druid.segment.VirtualColumn;
 
 import javax.annotation.Nullable;
-import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Holds the result of splitting a filter into:
@@ -37,12 +37,12 @@ public class JoinFilterSplit
 {
   final Optional<Filter> baseTableFilter;
   final Optional<Filter> joinTableFilter;
-  final List<VirtualColumn> pushDownVirtualColumns;
+  final Set<VirtualColumn> pushDownVirtualColumns;
 
   public JoinFilterSplit(
       @Nullable Filter baseTableFilter,
       @Nullable Filter joinTableFilter,
-      List<VirtualColumn> pushDownVirtualColumns
+      Set<VirtualColumn> pushDownVirtualColumns
   )
   {
     this.baseTableFilter = baseTableFilter == null ? Optional.empty() : 
Optional.of(baseTableFilter);
@@ -60,7 +60,7 @@ public class JoinFilterSplit
     return joinTableFilter;
   }
 
-  public List<VirtualColumn> getPushDownVirtualColumns()
+  public Set<VirtualColumn> getPushDownVirtualColumns()
   {
     return pushDownVirtualColumns;
   }
diff --git 
a/processing/src/test/java/org/apache/druid/query/filter/LikeDimFilterTest.java 
b/processing/src/test/java/org/apache/druid/query/filter/LikeDimFilterTest.java
index e918a66..3d6ae67 100644
--- 
a/processing/src/test/java/org/apache/druid/query/filter/LikeDimFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/filter/LikeDimFilterTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.filter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
 import org.junit.Assert;
@@ -68,4 +69,13 @@ public class LikeDimFilterTest
     final DimFilter filter = new LikeDimFilter("foo", "bar%", "@", new 
SubstringDimExtractionFn(1, 2));
     Assert.assertEquals(filter.getRequiredColumns(), Sets.newHashSet("foo"));
   }
+
+  @Test
+  public void test_LikeMatcher_equals()
+  {
+    EqualsVerifier.forClass(LikeDimFilter.LikeMatcher.class)
+                  .usingGetClass()
+                  .withNonnullFields("suffixMatch", "prefix", "pattern")
+                  .verify();
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java
index cda4eba..c0c8674 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/BoundFilterTest.java
@@ -21,19 +21,25 @@ package org.apache.druid.segment.filter;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.JavaScriptExtractionFn;
 import org.apache.druid.query.filter.BoundDimFilter;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.ordering.StringComparators;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -60,6 +66,9 @@ public class BoundFilterTest extends BaseFilterTest
     super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -724,6 +733,26 @@ public class BoundFilterTest extends BaseFilterTest
   }
 
   @Test
+  public void testRequiredColumnRewrite()
+  {
+    BoundFilter filter = new BoundFilter(
+        new BoundDimFilter("dim0", "", "", false, false, true, null, 
StringComparators.ALPHANUMERIC)
+    );
+    BoundFilter filter2 = new BoundFilter(
+        new BoundDimFilter("dim1", "", "", false, false, true, null, 
StringComparators.ALPHANUMERIC)
+    );
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Received a non-applicable rewrite: 
{invalidName=dim1}, filter's dimension: dim0");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
+  @Test
   public void test_equals()
   {
     EqualsVerifier.forClass(BoundFilter.class)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
index 2687be9..3134fad 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java
@@ -37,12 +37,15 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.ExpressionDimFilter;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -111,6 +114,9 @@ public class ExpressionFilterTest extends BaseFilterTest
     );
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -273,6 +279,17 @@ public class ExpressionFilterTest extends BaseFilterTest
     Assert.assertEquals(edf("missing == ''").getRequiredColumns(), 
Sets.newHashSet("missing"));
   }
 
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = edf("dim1 == '1'").toFilter();
+    Assert.assertFalse(filter.supportsRequiredColumnRewrite());
+
+    expectedException.expect(UnsupportedOperationException.class);
+    expectedException.expectMessage("Required column rewrite is not supported 
by this filter.");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
   private static ExpressionDimFilter edf(final String expression)
   {
     return new ExpressionDimFilter(expression, null, 
TestExprMacroTable.INSTANCE);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterCnfConversionTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterCnfConversionTest.java
index 6857e93..a169285 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/FilterCnfConversionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/FilterCnfConversionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.filter;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.dimension.DimensionSpec;
@@ -354,6 +355,16 @@ public class FilterCnfConversionTest
     assertFilter(filter, expectedCnf, Filters.toCnf(filter));
   }
 
+  @Test
+  public void testTrueFalseFilterRequiredColumnRewrite()
+  {
+    Assert.assertTrue(TrueFilter.instance().supportsRequiredColumnRewrite());
+    Assert.assertTrue(FalseFilter.instance().supportsRequiredColumnRewrite());
+
+    Assert.assertEquals(TrueFilter.instance(), 
TrueFilter.instance().rewriteRequiredColumns(ImmutableMap.of()));
+    Assert.assertEquals(FalseFilter.instance(), 
FalseFilter.instance().rewriteRequiredColumns(ImmutableMap.of()));
+  }
+
   private void assertFilter(Filter original, Filter expectedConverted, Filter 
actualConverted)
   {
     assertEquivalent(original, expectedConverted);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java
index 4ca9192..0e233ad 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/InFilterTest.java
@@ -32,19 +32,24 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.JavaScriptExtractionFn;
 import org.apache.druid.query.extraction.MapLookupExtractor;
 import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.lookup.LookupExtractionFn;
 import org.apache.druid.query.lookup.LookupExtractor;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -85,6 +90,9 @@ public class InFilterTest extends BaseFilterTest
     super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -354,6 +362,23 @@ public class InFilterTest extends BaseFilterTest
   }
 
   @Test
+  public void testRequiredColumnRewrite()
+  {
+    InFilter filter = (InFilter) toInFilter("dim0", "a", "c").toFilter();
+    InFilter filter2 = (InFilter) toInFilter("dim1", "a", "c").toFilter();
+
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Received a non-applicable rewrite: 
{invalidName=dim1}, filter's dimension: dim0");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
+  @Test
   public void test_equals()
   {
     EqualsVerifier.forClass(InFilter.class)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/JavaScriptFilterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/JavaScriptFilterTest.java
index 510908d..61e30b9 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/JavaScriptFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/JavaScriptFilterTest.java
@@ -27,13 +27,17 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.MapLookupExtractor;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.JavaScriptDimFilter;
 import org.apache.druid.query.lookup.LookupExtractionFn;
 import org.apache.druid.query.lookup.LookupExtractor;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -54,6 +58,9 @@ public class JavaScriptFilterTest extends BaseFilterTest
     super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -221,6 +228,17 @@ public class JavaScriptFilterTest extends BaseFilterTest
     assertFilterMatchesSkipVectorize(newJavaScriptDimFilter("l0", 
jsNumericValueFilter("9001"), null), ImmutableList.of("4"));
   }
 
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = newJavaScriptDimFilter("dim3", jsValueFilter("a"), 
null).toFilter();
+    Assert.assertFalse(filter.supportsRequiredColumnRewrite());
+
+    expectedException.expect(UnsupportedOperationException.class);
+    expectedException.expectMessage("Required column rewrite is not supported 
by this filter.");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
   private JavaScriptDimFilter newJavaScriptDimFilter(
       final String dimension,
       final String function,
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/LikeFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/LikeFilterTest.java
index 4fabf15..5f354c3 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/LikeFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/LikeFilterTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.filter;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -30,13 +31,18 @@ import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.extraction.SubstringDimExtractionFn;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.LikeDimFilter;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -77,6 +83,9 @@ public class LikeFilterTest extends BaseFilterTest
     super(testName, ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -262,4 +271,30 @@ public class LikeFilterTest extends BaseFilterTest
         ImmutableList.of("6")
     );
   }
+
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = new LikeDimFilter("dim0", "e%", null, new 
SubstringDimExtractionFn(1, 100)).toFilter();
+    Filter filter2 = new LikeDimFilter("dim1", "e%", null, new 
SubstringDimExtractionFn(1, 100)).toFilter();
+
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Received a non-applicable rewrite: 
{invalidName=dim1}, filter's dimension: dim0");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
+  @Test
+  public void test_equals()
+  {
+    EqualsVerifier.forClass(LikeFilter.class)
+                  .usingGetClass()
+                  .withNonnullFields("dimension", "likeMatcher")
+                  .verify();
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/NotFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/NotFilterTest.java
index d3f34e1..b64b415 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/NotFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/NotFilterTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.druid.segment.filter;
 
+import com.google.common.collect.ImmutableMap;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.filter.ExpressionDimFilter;
 import org.apache.druid.query.filter.Filter;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,4 +42,21 @@ public class NotFilterTest
     final Filter notFilter = FilterTestUtils.not(baseFilter);
     Assert.assertNotEquals(notFilter.hashCode(), baseFilter.hashCode());
   }
+
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = new NotFilter(new SelectorFilter("dim0", "B"));
+    Filter filter2 = new NotFilter(new SelectorFilter("dim1", "B"));
+
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    Filter filter3 = new NotFilter(new ExpressionDimFilter("dim0 == 'B'", 
ExprMacroTable.nil()).toFilter());
+    Assert.assertFalse(filter3.supportsRequiredColumnRewrite());
+  }
+
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/RegexFilterTest.java 
b/processing/src/test/java/org/apache/druid/segment/filter/RegexFilterTest.java
index 7a4b0d8..6e988c5 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/RegexFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/RegexFilterTest.java
@@ -21,16 +21,23 @@ package org.apache.druid.segment.filter;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.JavaScriptExtractionFn;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.RegexDimFilter;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -50,6 +57,9 @@ public class RegexFilterTest extends BaseFilterTest
     super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -135,4 +145,31 @@ public class RegexFilterTest extends BaseFilterTest
     assertFilterMatches(new RegexDimFilter("dim4", ".*ANYMORE", changeNullFn), 
ImmutableList.of("0", "1", "2", "3", "4", "5"));
     assertFilterMatches(new RegexDimFilter("dim4", "a.*", changeNullFn), 
ImmutableList.of());
   }
+
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = new RegexDimFilter("dim0", ".*", null).toFilter();
+    Filter filter2 = new RegexDimFilter("dim1", ".*", null).toFilter();
+
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Received a non-applicable rewrite: 
{invalidName=dim1}, filter's dimension: dim0");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
+  @Test
+  public void test_equals()
+  {
+    EqualsVerifier.forClass(RegexFilter.class)
+                  .usingGetClass()
+                  .withNonnullFields("dimension", "pattern")
+                  .withIgnoredFields("predicateFactory")
+                  .verify();
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/SearchQueryFilterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/SearchQueryFilterTest.java
index 31ae1c4..c28fb50 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/SearchQueryFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/SearchQueryFilterTest.java
@@ -21,18 +21,25 @@ package org.apache.druid.segment.filter;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.js.JavaScriptConfig;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.extraction.JavaScriptExtractionFn;
+import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.SearchQueryDimFilter;
 import org.apache.druid.query.search.ContainsSearchQuerySpec;
 import org.apache.druid.query.search.SearchQuerySpec;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.StorageAdapter;
 import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -52,6 +59,9 @@ public class SearchQueryFilterTest extends BaseFilterTest
     super(testName, DEFAULT_ROWS, indexBuilder, finisher, cnf, optimize);
   }
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @AfterClass
   public static void tearDown() throws Exception
   {
@@ -172,4 +182,31 @@ public class SearchQueryFilterTest extends BaseFilterTest
     assertFilterMatches(new SearchQueryDimFilter("dim4", 
specForValue("ANYMORE"), changeNullFn), ImmutableList.of("0", "1", "2", "3", 
"4", "5"));
     assertFilterMatches(new SearchQueryDimFilter("dim4", specForValue("a"), 
changeNullFn), ImmutableList.of());
   }
+
+  @Test
+  public void testRequiredColumnRewrite()
+  {
+    Filter filter = new SearchQueryDimFilter("dim0", specForValue("a"), 
null).toFilter();
+    Filter filter2 = new SearchQueryDimFilter("dim1", specForValue("a"), 
null).toFilter();
+
+    Assert.assertTrue(filter.supportsRequiredColumnRewrite());
+    Assert.assertTrue(filter2.supportsRequiredColumnRewrite());
+
+    Filter rewrittenFilter = 
filter.rewriteRequiredColumns(ImmutableMap.of("dim0", "dim1"));
+    Assert.assertEquals(filter2, rewrittenFilter);
+
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage("Received a non-applicable rewrite: 
{invalidName=dim1}, filter's dimension: dim0");
+    filter.rewriteRequiredColumns(ImmutableMap.of("invalidName", "dim1"));
+  }
+
+  @Test
+  public void test_equals()
+  {
+    EqualsVerifier.forClass(SearchQueryFilter.class)
+                  .usingGetClass()
+                  .withNonnullFields("dimension", "query")
+                  .withIgnoredFields("predicateFactory")
+                  .verify();
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java
index 5c8d903..753b359 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/JoinFilterAnalyzerTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -30,8 +31,10 @@ import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.BoundDimFilter;
+import org.apache.druid.query.filter.ExpressionDimFilter;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.filter.AndFilter;
@@ -48,6 +51,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Set;
 
 public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTest
 {
@@ -74,7 +78,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
         new SelectorFilter("channel", "#en.wikipedia"),
         null,
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -112,7 +116,6 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     );
   }
 
-
   @Test
   public void 
test_filterPushDown_factToRegionExprToCountryLeftFilterOnCountryName()
   {
@@ -151,7 +154,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
         null,
         new SelectorFilter("rtc.countryName", "United States"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -210,7 +213,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter("rtc.countryName", "United States"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -277,7 +280,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter("r1.regionName", null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
 
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
@@ -320,6 +323,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     Filter originalFilter = new AndFilter(
         ImmutableList.of(
             new SelectorFilter("baseTableInvalidColumn", "abcd"),
+            new SelectorFilter("baseTableInvalidColumn2", null),
             new SelectorFilter("rtc.invalidColumn", "abcd"),
             new SelectorFilter("r1.invalidColumn", "abcd")
         )
@@ -340,11 +344,12 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
         new SelectorFilter("baseTableInvalidColumn", "abcd"),
         new AndFilter(
             ImmutableList.of(
+                new SelectorFilter("baseTableInvalidColumn2", null),
                 new SelectorFilter("rtc.invalidColumn", "abcd"),
                 new SelectorFilter("r1.invalidColumn", "abcd")
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -406,7 +411,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
         new SelectorFilter("v1", "virtual-column-#en.wikipedia"),
         null,
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -484,7 +489,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
         null,
         new SelectorFilter("v0", "VIRGINIA"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -646,7 +651,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 )
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -717,7 +722,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter("rtc.countryName", "States United"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
 
@@ -736,7 +741,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
         actualFilterSplit.getJoinTableFilter()
     );
     ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) 
actualFilterSplit.getPushDownVirtualColumns()
-                                                                               
              .get(0);
+                                                                               
              .iterator().next();
     compareExpressionVirtualColumns(expectedVirtualColumn, 
actualVirtualColumn);
 
     JoinTestHelper.verifyCursors(
@@ -887,7 +892,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 )
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -964,7 +969,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter("c1.countryName", "Usca"),
-        ImmutableList.of(
+        ImmutableSet.of(
             expectedVirtualColumn
         )
     );
@@ -978,7 +983,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
         actualFilterSplit.getJoinTableFilter()
     );
     ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) 
actualFilterSplit.getPushDownVirtualColumns()
-                                                                               
              .get(0);
+                                                                               
              .iterator().next();
     compareExpressionVirtualColumns(expectedVirtualColumn, 
actualVirtualColumn);
 
     JoinTestHelper.verifyCursors(
@@ -1053,7 +1058,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter("c1.v", "Usca"),
-        ImmutableList.of(
+        ImmutableSet.of(
             expectedVirtualColumn
         )
     );
@@ -1067,7 +1072,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
         actualFilterSplit.getJoinTableFilter()
     );
     ExpressionVirtualColumn actualVirtualColumn = (ExpressionVirtualColumn) 
actualFilterSplit.getPushDownVirtualColumns()
-                                                                               
              .get(0);
+                                                                               
              .iterator().next();
     compareExpressionVirtualColumns(expectedVirtualColumn, 
actualVirtualColumn);
 
     JoinTestHelper.verifyCursors(
@@ -1119,7 +1124,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", 
"Germany"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1175,7 +1180,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", 
"Germany"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1230,7 +1235,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + 
"countryName", null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1284,7 +1289,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", 
null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1337,7 +1342,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "countryName", 
"Australia"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1399,7 +1404,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "v", 
"Australia"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1460,7 +1465,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + 
"countryName", null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1513,7 +1518,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_NUMBER_PREFIX + "v", 
null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1565,7 +1570,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "countryName", 
"El Salvador"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1621,7 +1626,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", "El 
Salvador"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1676,7 +1681,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + 
"countryName", null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1730,7 +1735,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 new SelectorFilter(FACT_TO_COUNTRY_ON_ISO_CODE_PREFIX + "v", 
null)
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1796,7 +1801,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             )
         ),
         new SelectorFilter("r1.regionName", "Fourems Province"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1840,7 +1845,12 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     List<JoinableClause> joinableClauses = ImmutableList.of(
         factExprToRegon
     );
-    Filter originalFilter = new SelectorFilter("r1.regionName", "Fourems 
Province");
+    Filter originalFilter = new OrFilter(
+        ImmutableList.of(
+            new SelectorFilter("r1.regionName", "Fourems Province"),
+            new SelectorFilter("r1.regionIsoCode", "AAAA")
+        )
+    );
 
     JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis(
         joinableClauses,
@@ -1853,9 +1863,14 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     );
 
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
-        new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), null, 
null).toFilter(),
-        new SelectorFilter("r1.regionName", "Fourems Province"),
-        ImmutableList.of()
+        new OrFilter(
+            ImmutableList.of(
+                new InDimFilter("regionIsoCode", ImmutableSet.of("MMMM"), 
null, null).toFilter(),
+                new SelectorFilter("regionIsoCode", "AAAA")
+            )
+        ),
+        originalFilter,
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -1879,6 +1894,83 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     );
   }
 
+
+  @Test
+  public void 
test_filterPushDown_factToRegionThreeRHSColumnsAllDirectAndFilterOnRHS()
+  {
+    JoinableClause factExprToRegon = new JoinableClause(
+        FACT_TO_REGION_PREFIX,
+        new IndexedTableJoinable(regionsTable),
+        JoinType.LEFT,
+        JoinConditionAnalysis.forExpression(
+            StringUtils.format(
+                "\"%sregionIsoCode\" == regionIsoCode && \"%scountryIsoCode\" 
== regionIsoCode && \"%sregionName\" == user",
+                FACT_TO_REGION_PREFIX,
+                FACT_TO_REGION_PREFIX,
+                FACT_TO_REGION_PREFIX
+            ),
+            FACT_TO_REGION_PREFIX,
+            ExprMacroTable.nil()
+        )
+    );
+    List<JoinableClause> joinableClauses = ImmutableList.of(
+        factExprToRegon
+    );
+    Filter originalFilter = new OrFilter(
+        ImmutableList.of(
+            new SelectorFilter("r1.regionName", "Fourems Province"),
+            new SelectorFilter("r1.regionIsoCode", "AAAA")
+        )
+    );
+
+    JoinFilterPreAnalysis joinFilterPreAnalysis = simplePreAnalysis(
+        joinableClauses,
+        originalFilter
+    );
+    HashJoinSegmentStorageAdapter adapter = new HashJoinSegmentStorageAdapter(
+        factSegment.asStorageAdapter(),
+        joinableClauses,
+        joinFilterPreAnalysis
+    );
+
+    JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
+        new OrFilter(
+            ImmutableList.of(
+                new SelectorFilter("user", "Fourems Province"),
+                new SelectorFilter("regionIsoCode", "AAAA")
+            )
+        ),
+        null,
+        ImmutableSet.of()
+    );
+    JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
+    Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
+
+    // This query doesn't execute because regionName is not a key column, but 
we can still check the
+    // filter rewrites.
+    expectedException.expect(IAE.class);
+    expectedException.expectMessage(
+        "Cannot build hash-join matcher on non-key-based condition: 
Equality{leftExpr=user, rightColumn='regionName'}"
+    );
+
+    JoinTestHelper.verifyCursors(
+        adapter.makeCursors(
+            originalFilter,
+            Intervals.ETERNITY,
+            VirtualColumns.EMPTY,
+            Granularities.ALL,
+            false,
+            null
+        ),
+        ImmutableList.of(
+            "page",
+            FACT_TO_REGION_PREFIX + "regionName"
+        ),
+        ImmutableList.of()
+    );
+  }
+
+
   @Test
   public void 
test_filterPushDown_factToRegionToCountryLeftFilterOnPageDisablePushDown()
   {
@@ -1906,7 +1998,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
         null,
         new SelectorFilter("page", "Peremptory norm"),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -2002,7 +2094,7 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
                 )
             )
         ),
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -2034,9 +2126,105 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
   @Test
   public void 
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumns()
   {
-    Filter originalFilter = new SelectorFilter("rtc.countryIsoCode", "CA");
+    
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(false);
+  }
+
+  @Test
+  public void 
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsWithLhsExpr()
+  {
+    
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(true);
+  }
+
+  private void 
test_filterPushDown_factToRegionToCountryLeftFilterOnRHSJoinConditionColumnsHelper(boolean
 hasLhsExpressionInJoinCondition)
+  {
+    Filter expressionFilter = new ExpressionDimFilter(
+        "\"rtc.countryIsoCode\" == 'CA'",
+        ExprMacroTable.nil()
+    ).toFilter();
+
+    Filter specialSelectorFilter = new SelectorFilter("rtc.countryIsoCode", 
"CA") {
+      @Override
+      public boolean supportsRequiredColumnRewrite()
+      {
+        return false;
+      }
+    };
+
+    Filter originalFilter = new AndFilter(
+        ImmutableList.of(
+            new SelectorFilter("r1.regionIsoCode", "ON"),
+            new SelectorFilter("rtc.countryIsoCode", "CA"),
+            specialSelectorFilter,
+            new BoundFilter(new BoundDimFilter(
+                "rtc.countryIsoCode",
+                "CA",
+                "CB",
+                false,
+                false,
+                null,
+                null,
+                null
+            )),
+            expressionFilter,
+            new InDimFilter("rtc.countryIsoCode", ImmutableSet.of("CA", "CA2", 
"CA3"), null, null).toFilter(),
+            new OrFilter(
+                ImmutableList.of(
+                    new SelectorFilter("channel", "#fr.wikipedia"),
+                    new SelectorFilter("rtc.countryIsoCode", "QQQ"),
+                    new BoundFilter(new BoundDimFilter(
+                        "rtc.countryIsoCode",
+                        "YYY",
+                        "ZZZ",
+                        false,
+                        false,
+                        null,
+                        null,
+                        null
+                ))
+                )
+            ),
+            new OrFilter(
+                ImmutableList.of(
+                    new SelectorFilter("namespace", "Main"),
+                    new SelectorFilter("rtc.countryIsoCode", "ABCDEF"),
+                    new SelectorFilter("rtc.countryName", "Canada"),
+                    new BoundFilter(new BoundDimFilter(
+                        "rtc.countryIsoCode",
+                        "XYZXYZ",
+                        "XYZXYZ",
+                        false,
+                        false,
+                        null,
+                        null,
+                        null
+                    ))
+                )
+            )
+        )
+    );
+
+    JoinableClause factToRegionClause;
+    if (hasLhsExpressionInJoinCondition) {
+      factToRegionClause = new JoinableClause(
+          FACT_TO_REGION_PREFIX,
+          new IndexedTableJoinable(regionsTable),
+          JoinType.LEFT,
+          JoinConditionAnalysis.forExpression(
+              StringUtils.format(
+                  "\"%sregionIsoCode\" == upper(lower(regionIsoCode)) && 
\"%scountryIsoCode\" == upper(lower(countryIsoCode))",
+                  FACT_TO_REGION_PREFIX,
+                  FACT_TO_REGION_PREFIX
+              ),
+              FACT_TO_REGION_PREFIX,
+              ExprMacroTable.nil()
+          )
+      );
+    } else {
+      factToRegionClause = factToRegion(JoinType.LEFT);
+    }
+
     List<JoinableClause> joinableClauses = ImmutableList.of(
-        factToRegion(JoinType.LEFT),
+        factToRegionClause,
         regionToCountry(JoinType.LEFT)
     );
 
@@ -2051,10 +2239,111 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
         joinFilterPreAnalysis
     );
 
+    String rewrittenCountryIsoCodeColumnName = hasLhsExpressionInJoinCondition
+                                               ? 
"JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-0"
+                                               : "countryIsoCode";
+
+
+    String rewrittenRegionIsoCodeColumnName = hasLhsExpressionInJoinCondition
+                                              ? 
"JOIN-FILTER-PUSHDOWN-VIRTUAL-COLUMN-1"
+                                              : "regionIsoCode";
+
+    Set<VirtualColumn> expectedVirtualColumns;
+    if (hasLhsExpressionInJoinCondition) {
+      expectedVirtualColumns = ImmutableSet.of(
+          new ExpressionVirtualColumn(
+              rewrittenRegionIsoCodeColumnName,
+              "(upper [(lower [regionIsoCode])])",
+              ValueType.STRING,
+              ExprMacroTable.nil()
+          ),
+          new ExpressionVirtualColumn(
+              rewrittenCountryIsoCodeColumnName,
+              "(upper [(lower [countryIsoCode])])",
+              ValueType.STRING,
+              ExprMacroTable.nil()
+          )
+      );
+    } else {
+      expectedVirtualColumns = ImmutableSet.of();
+    }
+
     JoinFilterSplit expectedFilterSplit = new JoinFilterSplit(
-        new InDimFilter("countryIsoCode", ImmutableSet.of("CA"), null, 
null).toFilter(),
-        new SelectorFilter("rtc.countryIsoCode", "CA"),
-        ImmutableList.of()
+        new AndFilter(
+            ImmutableList.of(
+                new SelectorFilter(rewrittenRegionIsoCodeColumnName, "ON"),
+                new SelectorFilter(rewrittenCountryIsoCodeColumnName, "CA"),
+                new BoundFilter(new BoundDimFilter(
+                    rewrittenCountryIsoCodeColumnName,
+                    "CA",
+                    "CB",
+                    false,
+                    false,
+                    null,
+                    null,
+                    null
+                )),
+                new InDimFilter(rewrittenCountryIsoCodeColumnName, 
ImmutableSet.of("CA", "CA2", "CA3"), null, null).toFilter(),
+                new InDimFilter(rewrittenCountryIsoCodeColumnName, 
ImmutableSet.of("CA"), null, null).toFilter(),
+                new OrFilter(
+                    ImmutableList.of(
+                        new SelectorFilter("channel", "#fr.wikipedia"),
+                        new SelectorFilter(rewrittenCountryIsoCodeColumnName, 
"QQQ"),
+                        new BoundFilter(new BoundDimFilter(
+                            rewrittenCountryIsoCodeColumnName,
+                            "YYY",
+                            "ZZZ",
+                            false,
+                            false,
+                            null,
+                            null,
+                            null
+                        ))
+                    )
+                ),
+                new OrFilter(
+                    ImmutableList.of(
+                        new SelectorFilter("namespace", "Main"),
+                        new SelectorFilter(rewrittenCountryIsoCodeColumnName, 
"ABCDEF"),
+                        new InDimFilter(rewrittenCountryIsoCodeColumnName, 
ImmutableSet.of("CA"), null, null).toFilter(),
+                        new BoundFilter(new BoundDimFilter(
+                            rewrittenCountryIsoCodeColumnName,
+                            "XYZXYZ",
+                            "XYZXYZ",
+                            false,
+                            false,
+                            null,
+                            null,
+                            null
+                        ))
+                    )
+                )
+            )
+        ),
+        new AndFilter(
+            ImmutableList.of(
+                specialSelectorFilter,
+                expressionFilter,
+                new OrFilter(
+                    ImmutableList.of(
+                        new SelectorFilter("namespace", "Main"),
+                        new SelectorFilter("rtc.countryIsoCode", "ABCDEF"),
+                        new SelectorFilter("rtc.countryName", "Canada"),
+                        new BoundFilter(new BoundDimFilter(
+                            "rtc.countryIsoCode",
+                            "XYZXYZ",
+                            "XYZXYZ",
+                            false,
+                            false,
+                            null,
+                            null,
+                            null
+                        ))
+                    )
+                )
+            )
+        ),
+        expectedVirtualColumns
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
@@ -2074,22 +2363,18 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             REGION_TO_COUNTRY_PREFIX + "countryName"
         ),
         ImmutableList.of(
-            new Object[]{"Didier Leclair", "Ontario", "Canada"},
-            new Object[]{"Les Argonautes", "Quebec", "Canada"},
-            new Object[]{"Sarah Michelle Gellar", "Ontario", "Canada"}
+            new Object[]{"Didier Leclair", "Ontario", "Canada"}
         )
     );
   }
 
-
-
   @Test
   public void 
test_filterPushDown_factToRegionToCountryLeftFilterOnTwoRHSColumnsSameValue()
   {
     Filter originalFilter = new AndFilter(
         ImmutableList.of(
-            new SelectorFilter("r1.regionIsoCode", "CA"),
-            new SelectorFilter("r1.countryIsoCode", "CA")
+            new SelectorFilter("r1.regionName", "California"),
+            new SelectorFilter("r1.extraField", "California")
         )
     );
 
@@ -2113,20 +2398,20 @@ public class JoinFilterAnalyzerTest extends 
BaseHashJoinSegmentStorageAdapterTes
             ImmutableList.of(
                 new AndFilter(
                     ImmutableList.of(
-                        new InDimFilter("countryIsoCode", 
ImmutableSet.of("US"), null, null).toFilter(),
-                        new InDimFilter("regionIsoCode", 
ImmutableSet.of("CA"), null, null).toFilter()
+                        new InDimFilter("countryIsoCode", 
ImmutableSet.of("MMMM", "AAAA"), null, null).toFilter(),
+                        new InDimFilter("regionIsoCode", 
ImmutableSet.of("MMMM", "AAAA"), null, null).toFilter()
                     )
                 ),
                 new AndFilter(
                     ImmutableList.of(
-                        new InDimFilter("countryIsoCode", 
ImmutableSet.of("CA"), null, null).toFilter(),
-                        new InDimFilter("regionIsoCode", ImmutableSet.of("ON", 
"QC"), null, null).toFilter()
+                        new InDimFilter("countryIsoCode", 
ImmutableSet.of("US"), null, null).toFilter(),
+                        new InDimFilter("regionIsoCode", 
ImmutableSet.of("CA"), null, null).toFilter()
                     )
                 )
             )
         ),
         originalFilter,
-        ImmutableList.of()
+        ImmutableSet.of()
     );
     JoinFilterSplit actualFilterSplit = 
JoinFilterAnalyzer.splitFilter(joinFilterPreAnalysis);
     Assert.assertEquals(expectedFilterSplit, actualFilterSplit);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java 
b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java
index b63dc52..0f960de 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java
@@ -99,6 +99,7 @@ public class JoinTestHelper
                   .add("regionIsoCode", ValueType.STRING)
                   .add("countryIsoCode", ValueType.STRING)
                   .add("regionName", ValueType.STRING)
+                  .add("extraField", ValueType.STRING)
                   .build();
 
   private static final ColumnProcessorFactory<Supplier<Object>> SIMPLE_READER =
diff --git a/processing/src/test/resources/wikipedia/regions.json 
b/processing/src/test/resources/wikipedia/regions.json
index 3c74579..68fd285 100644
--- a/processing/src/test/resources/wikipedia/regions.json
+++ b/processing/src/test/resources/wikipedia/regions.json
@@ -17,4 +17,5 @@
 {"regionIsoCode":"VA","countryIsoCode":"US","regionName":"Virginia"}
 {"regionIsoCode":"AV","countryIsoCode":"SU","regionName":"Ainigriv"}
 {"regionIsoCode":"ZZ","countryIsoCode":"USCA","regionName":"Usca City"}
-{"regionIsoCode":"MMMM","countryIsoCode":"MMMM","regionName":"Fourems 
Province"}
+{"regionIsoCode":"MMMM","countryIsoCode":"MMMM","regionName":"Fourems 
Province", "extraField":"California"}
+{"regionIsoCode":"AAAA","countryIsoCode":"AAAA","regionName":"Foureis 
Province", "extraField":"California"}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to