This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new bf32589  ORC-1027: Allows the discovery of filters via the plugin 
interface. (#936)
bf32589 is described below

commit bf32589444802022f530e9f26b7262feb9e2aa1f
Author: Pavan Lanka <[email protected]>
AuthorDate: Thu Oct 14 22:42:01 2021 -0700

    ORC-1027: Allows the discovery of filters via the plugin interface. (#936)
    
    ### What changes were proposed in this pull request?
    Allow injection of filters to be applied during read via a PluginService 
using the Java ServiceLoader.
    
    ### Why are the changes needed?
    The current integration uses SearchArguments this does allow easy 
integration but cannot be used to represent complex predicates e.g. predicates 
using a function, in these cases the predicate applicable is not optimized 
using LazyIO.
    
    This allows a means to workaround this limitation by injecting user defined 
filters that are applied during read.
    
    ### How was this patch tested?
    Regression Tests were successful
    Additional Tests were added to test the Plugin functionality.
---
 .../apache/orc/bench/core/filter/FilterBench.java  |   4 +-
 .../apache/orc/bench/core/filter/TestFilter.java   |   4 +-
 java/core/pom.xml                                  |   9 +
 java/core/src/java/org/apache/orc/OrcConf.java     |   9 +
 java/core/src/java/org/apache/orc/Reader.java      |  11 +
 .../org/apache/orc/filter/PluginFilterService.java |  40 ++++
 .../java/org/apache/orc/impl/RecordReaderImpl.java |   6 +-
 .../apache/orc/impl/filter/BatchFilterFactory.java |   2 +-
 .../org/apache/orc/impl/filter/FilterFactory.java  |  51 ++++-
 .../org/apache/orc/TestRowFilteringIOSkip.java     |  37 +++
 .../apache/orc/impl/filter/MyFilterService.java    | 100 +++++++++
 .../orc/impl/filter/TestPluginFilterService.java   |  59 +++++
 .../apache/orc/impl/filter/TestPluginFilters.java  | 250 +++++++++++++++++++++
 .../apache/orc/impl/filter/leaf/TestFilters.java   |   3 +-
 java/pom.xml                                       |   6 +
 15 files changed, 579 insertions(+), 12 deletions(-)

diff --git 
a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java 
b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
index f7fcb16..c984e6d 100644
--- a/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
+++ b/java/bench/core/src/java/org/apache/orc/bench/core/filter/FilterBench.java
@@ -170,7 +170,9 @@ public class FilterBench implements OrcBenchmark {
                                                FilterBenchUtil.schema,
                                                false,
                                                OrcFile.Version.CURRENT,
-                                               normalize);
+                                               normalize,
+                                               null,
+                                               null);
       default:
         throw new IllegalArgumentException();
     }
diff --git 
a/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java 
b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
index 09f814b..d6a3814 100644
--- a/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
+++ b/java/bench/core/src/test/org/apache/orc/bench/core/filter/TestFilter.java
@@ -110,7 +110,9 @@ public class TestFilter {
                                                    FilterBenchUtil.schema,
                                                    false,
                                                    OrcFile.Version.CURRENT,
-                                                   normalize);
+                                                   normalize,
+                                                   null,
+                                                   null);
           break;
         default:
           throw new IllegalArgumentException();
diff --git a/java/core/pom.xml b/java/core/pom.xml
index 9a369f1..69fe1e2 100644
--- a/java/core/pom.xml
+++ b/java/core/pom.xml
@@ -102,6 +102,11 @@
       <artifactId>mockito-junit-jupiter</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -151,10 +156,14 @@
           <ignoredUnusedDeclaredDependencies>
             
<ignoredUnusedDeclaredDependency>org.apache.hadoop:hadoop-hdfs</ignoredUnusedDeclaredDependency>
           </ignoredUnusedDeclaredDependencies>
+          <ignoredUsedUndeclaredDependencies>
+            
<ignoredUsedUndeclaredDependency>com.google.auto.service:auto-service-annotations</ignoredUsedUndeclaredDependency>
+          </ignoredUsedUndeclaredDependencies>
           <ignoredDependencies>
             
<ignoredDependency>org.apache.hive:hive-storage-api</ignoredDependency>
             
<ignoredDependency>org.apache.hadoop:hadoop-client-api</ignoredDependency>
             
<ignoredDependency>org.apache.hadoop:hadoop-client-runtime</ignoredDependency>
+            
<ignoredDependency>com.google.auto.service:auto-service</ignoredDependency>
           </ignoredDependencies>
         </configuration>
       </plugin>
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java 
b/java/core/src/java/org/apache/orc/OrcConf.java
index a20753d..6b64415 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -176,6 +176,15 @@ public enum OrcConf {
                         + "must have the filter\n"
                         + "reapplied to avoid using unset values in the 
unselected rows.\n"
                         + "If unsure please leave this as false."),
+  ALLOW_PLUGIN_FILTER("orc.filter.plugin",
+                      "orc.filter.plugin",
+                      false,
+                      "Enables the use of plugin filters during read. The 
plugin filters "
+                      + "are discovered against the service "
+                      + "org.apache.orc.filter.PluginFilterService, if 
multiple filters are "
+                      + "determined, they are combined using AND. The order of 
application is "
+                      + "non-deterministic and the filter functionality should 
not depend on the "
+                      + "order of application."),
   WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false,
       "A boolean flag as to whether the ORC writer should write variable 
length\n"
       + "HDFS blocks."),
diff --git a/java/core/src/java/org/apache/orc/Reader.java 
b/java/core/src/java/org/apache/orc/Reader.java
index 134dca3..0b88599 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -233,6 +233,7 @@ public interface Reader extends Closeable {
     private boolean includeAcidColumns = true;
     private boolean allowSARGToFilter = false;
     private boolean useSelected = false;
+    private boolean allowPluginFilters = false;
 
     /**
      * @since 1.1.0
@@ -254,6 +255,7 @@ public interface Reader extends Closeable {
           OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.getBoolean(conf);
       allowSARGToFilter = OrcConf.ALLOW_SARG_TO_FILTER.getBoolean(conf);
       useSelected = OrcConf.READER_USE_SELECTED.getBoolean(conf);
+      allowPluginFilters = OrcConf.ALLOW_PLUGIN_FILTER.getBoolean(conf);
     }
 
     /**
@@ -637,6 +639,15 @@ public interface Reader extends Closeable {
       this.useSelected = newValue;
       return this;
     }
+
+    public boolean allowPluginFilters() {
+      return allowPluginFilters;
+    }
+
+    public Options allowPluginFilters(boolean allowPluginFilters) {
+      this.allowPluginFilters = allowPluginFilters;
+      return this;
+    }
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/filter/PluginFilterService.java 
b/java/core/src/java/org/apache/orc/filter/PluginFilterService.java
new file mode 100644
index 0000000..e353df4
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/filter/PluginFilterService.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.filter;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service to determine Plugin filters to be used during read. The plugin 
filters determined are
+ * combined using AND.
+ * The filter is expected to be deterministic (for reattempts) and agnostic of 
the application order
+ * which is non-deterministic.
+ */
+public interface PluginFilterService {
+  /**
+   * Determine the filter for a given read path. The determination is based on 
the path and the
+   * read configuration, this should be carefully considered when using this 
in queries that might
+   * refer to the same table/files with multiple aliases.
+   *
+   * @param filePath The fully qualified file path that is being read
+   * @param config   The read configuration is supplied as input. This should 
not be changed.
+   * @return The plugin filter determined for the given filePath
+   */
+  BatchFilter getFilter(String filePath, Configuration config);
+}
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java 
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 794de92..aade2ef 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -286,11 +286,15 @@ public class RecordReaderImpl implements RecordReader {
 
     String[] filterCols = null;
     Consumer<OrcFilterContext> filterCallBack = null;
+    String filePath = options.allowPluginFilters() ?
+      fileReader.getFileSystem().makeQualified(fileReader.path).toString() : 
null;
     BatchFilter filter = FilterFactory.createBatchFilter(options,
                                                          
evolution.getReaderBaseSchema(),
                                                          
evolution.isSchemaEvolutionCaseAware(),
                                                          
fileReader.getFileVersion(),
-                                                         false);
+                                                         false,
+                                                         filePath,
+                                                         fileReader.conf);
     if (filter != null) {
       // If a filter is determined then use this
       filterCallBack = filter;
diff --git 
a/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java 
b/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java
index da2afee..af0592a 100644
--- a/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/filter/BatchFilterFactory.java
@@ -91,7 +91,7 @@ class BatchFilterFactory {
     }
   }
 
-  private static class AndBatchFilterImpl implements BatchFilter {
+  static class AndBatchFilterImpl implements BatchFilter {
     private final BatchFilter[] filters;
     private final String[] colNames;
 
diff --git a/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java 
b/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java
index 1902f6c..7cb3ddc 100644
--- a/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/filter/FilterFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.orc.impl.filter;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -25,6 +26,7 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.filter.PluginFilterService;
 import org.apache.orc.impl.filter.leaf.LeafFilterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.ServiceLoader;
 import java.util.Set;
 
 public class FilterFactory {
@@ -46,16 +49,35 @@ public class FilterFactory {
    * @param isSchemaCaseAware identifies if the schema is case-sensitive
    * @param version           provides the ORC file version
    * @param normalize         identifies if the SArg should be normalized or 
not
+   * @param filePath          that is fully qualified to determine plugin 
filter(s)
+   * @param conf              configuration shared when determining Plugin 
filter(s)
    * @return BatchFilter that represents the SearchArgument or null
    */
   public static BatchFilter createBatchFilter(Reader.Options opts,
                                               TypeDescription readSchema,
                                               boolean isSchemaCaseAware,
                                               OrcFile.Version version,
-                                              boolean normalize) {
+                                              boolean normalize,
+                                              String filePath,
+                                              Configuration conf) {
     List<BatchFilter> filters = new ArrayList<>(2);
 
-    // 1. Process SArgFilter
+    // 1. Process input filter
+    if (opts.getFilterCallback() != null) {
+      filters.add(BatchFilterFactory.create(opts.getFilterCallback(),
+                                            opts.getPreFilterColumnNames()));
+    }
+
+    // 2. Process PluginFilter
+    if (opts.allowPluginFilters()) {
+      List<BatchFilter> pluginFilters = findPluginFilters(filePath, conf);
+      if (!pluginFilters.isEmpty()) {
+        LOG.debug("Added plugin filters {} to the read", pluginFilters);
+        filters.addAll(pluginFilters);
+      }
+    }
+
+    // 3. Process SArgFilter
     if (opts.isAllowSARGToFilter() && opts.getSearchArgument() != null) {
       SearchArgument sArg = opts.getSearchArgument();
       Set<String> colNames = new HashSet<>();
@@ -74,11 +96,6 @@ public class FilterFactory {
       }
     }
 
-    // 2. Process input filter
-    if (opts.getFilterCallback() != null) {
-      filters.add(BatchFilterFactory.create(opts.getFilterCallback(),
-                                            opts.getPreFilterColumnNames()));
-    }
     return BatchFilterFactory.create(filters);
   }
 
@@ -147,4 +164,24 @@ public class FilterFactory {
       super(message);
     }
   }
+
+  /**
+   * Find filter(s) for a given file path. The order in which the filter 
services are invoked is
+   * unpredictable.
+   *
+   * @param filePath fully qualified path of the file being evaluated
+   * @param conf     reader configuration of ORC, can be used to configure the 
filter services
+   * @return The plugin filter(s) matching the given file, can be empty if 
none are found
+   */
+  static List<BatchFilter> findPluginFilters(String filePath, Configuration 
conf) {
+    List<BatchFilter> filters = new ArrayList<>();
+    for (PluginFilterService s : 
ServiceLoader.load(PluginFilterService.class)) {
+      LOG.debug("Processing filter service {}", s);
+      BatchFilter filter = s.getFilter(filePath, conf);
+      if (filter != null) {
+        filters.add(filter);
+      }
+    }
+    return filters;
+  }
 }
diff --git a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java 
b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
index 88fd1f9..863bb7d 100644
--- a/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
+++ b/java/core/src/test/org/apache/orc/TestRowFilteringIOSkip.java
@@ -158,6 +158,7 @@ public class TestRowFilteringIOSkip {
     while (cnt > 0) {
       ridx = r.nextInt((int) RowCount);
       readSingleRowWithFilter(ridx);
+      readSingleRowWithPluginFilter(ridx);
       cnt--;
     }
   }
@@ -182,6 +183,29 @@ public class TestRowFilteringIOSkip {
     assertEquals(1, rowCount);
   }
 
+  private void readSingleRowWithPluginFilter(long idx) throws IOException {
+    Configuration localConf = new Configuration(conf);
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(localConf, true);
+    localConf.set("my.filter.name", "my_long_abs_eq");
+    localConf.set("my.filter.col.name", "ridx");
+    localConf.set("my.filter.col.value", String.valueOf(-idx));
+    localConf.set("my.filter.scope", fs.makeQualified(filePath.getParent()) + 
"/.*");
+
+    Reader r = OrcFile.createReader(filePath, 
OrcFile.readerOptions(localConf).filesystem(fs));
+    Reader.Options options = r.options()
+      .useSelected(true)
+      .allowSARGToFilter(true);
+    VectorizedRowBatch b = schema.createRowBatch();
+    long rowCount = 0;
+    try (RecordReader rr = r.rows(options)) {
+      assertTrue(rr.nextBatch(b));
+      validateBatch(b, idx);
+      rowCount += b.size;
+      assertFalse(rr.nextBatch(b));
+    }
+    assertEquals(1, rowCount);
+  }
+
   @Test
   public void readWithoutSelectedSupport() throws IOException {
     // When selected vector is not supported we will read more rows than just 
the filtered rows.
@@ -331,6 +355,19 @@ public class TestRowFilteringIOSkip {
                                   new InFilter(new HashSet<>(0), 0)));
   }
 
+  @Test
+  public void filterAllRowsWPluginFilter() throws IOException {
+    readStart();
+    Configuration localConf = new Configuration(conf);
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(localConf, true);
+    localConf.set("my.filter.name", "my_long_abs_eq");
+    localConf.set("my.filter.col.name", "f1");
+    localConf.set("my.filter.col.value", String.valueOf(Long.MIN_VALUE));
+    localConf.set("my.filter.scope", fs.makeQualified(filePath.getParent()) + 
"/.*");
+    Reader r = OrcFile.createReader(filePath, 
OrcFile.readerOptions(localConf).filesystem(fs));
+    filterAllRows(r, r.options());
+  }
+
   private void filterAllRows(Reader r, Reader.Options options) throws 
IOException {
     VectorizedRowBatch b = schema.createRowBatch();
     long rowCount = 0;
diff --git a/java/core/src/test/org/apache/orc/impl/filter/MyFilterService.java 
b/java/core/src/test/org/apache/orc/impl/filter/MyFilterService.java
new file mode 100644
index 0000000..1968e9a
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/MyFilterService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import com.google.auto.service.AutoService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.orc.filter.BatchFilter;
+import org.apache.orc.filter.PluginFilterService;
+
+import java.util.Locale;
+
+@AutoService(PluginFilterService.class)
+public class MyFilterService implements PluginFilterService {
+  @Override
+  public BatchFilter getFilter(String filePath, Configuration conf) {
+    if (!filePath.matches(conf.get("my.filter.scope", ""))) {
+      return null;
+    }
+
+    switch (conf.get("my.filter.name", "")) {
+      case "my_str_i_eq":
+        return makeFilter(new StringIgnoreCaseEquals(conf));
+      case "my_long_abs_eq":
+        return makeFilter(new LongAbsEquals(conf));
+      default:
+        return null;
+    }
+  }
+
+  private static BatchFilter makeFilter(LeafFilter filter) {
+    return BatchFilterFactory.create(filter, new String[] 
{filter.getColName()});
+  }
+
+  public static class StringIgnoreCaseEquals extends LeafFilter {
+    private final String value;
+    private final Locale locale;
+
+    protected StringIgnoreCaseEquals(Configuration conf) {
+      this(conf.get("my.filter.col.name"),
+           conf.get("my.filter.col.value"),
+           conf.get("my.filter.lang_tag") == null ?
+             Locale.ROOT :
+             Locale.forLanguageTag(conf.get("my.filter.lang_tag")));
+    }
+
+    protected StringIgnoreCaseEquals(String colName, String value, Locale 
locale) {
+      super(colName, false);
+      if (colName.isEmpty()) {
+        throw new IllegalArgumentException("Filter needs a valid column name");
+      }
+      this.locale = locale;
+      this.value = value.toLowerCase(locale);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return ((BytesColumnVector) 
v).toString(rowIdx).toLowerCase(locale).equals(value);
+    }
+  }
+
+  public static class LongAbsEquals extends LeafFilter {
+    private final long value;
+
+    protected LongAbsEquals(Configuration conf) {
+      this(conf.get("my.filter.col.name"),
+           conf.getLong("my.filter.col.value", -1));
+    }
+
+    protected LongAbsEquals(String colName, long value) {
+      super(colName, false);
+      assert !colName.isEmpty() : "Filter needs a valid column name";
+      this.value = Math.abs(value);
+    }
+
+    @Override
+    protected boolean allow(ColumnVector v, int rowIdx) {
+      return Math.abs(((LongColumnVector) v).vector[rowIdx]) == value;
+    }
+  }
+
+}
diff --git 
a/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilterService.java 
b/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilterService.java
new file mode 100644
index 0000000..757ea5d
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilterService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestPluginFilterService {
+  private final Configuration conf;
+
+  public TestPluginFilterService() {
+    conf = new Configuration();
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "aBcd");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+  }
+
+  @Test
+  public void testFoundFilter() {
+    conf.set("my.filter.name", "my_str_i_eq");
+    assertNotNull(FilterFactory.findPluginFilters("file://db/table1/file1", 
conf));
+  }
+
+  @Test
+  public void testErrorCreatingFilter() {
+    Configuration localConf = new Configuration(conf);
+    localConf.set("my.filter.name", "my_str_i_eq");
+    localConf.set("my.filter.col.name", "");
+    assertThrows(IllegalArgumentException.class,
+                 () -> 
FilterFactory.findPluginFilters("file://db/table1/file1", localConf),
+                 "Filter needs a valid column name");
+  }
+
+  @Test
+  public void testMissingFilter() {
+    assertTrue(FilterFactory.findPluginFilters("file://db/table11/file1", 
conf).isEmpty());
+  }
+}
\ No newline at end of file
diff --git 
a/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilters.java 
b/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilters.java
new file mode 100644
index 0000000..bc5535f
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/filter/TestPluginFilters.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.filter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFilterContext;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.filter.BatchFilter;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestPluginFilters extends ATestFilter {
+
+  @Test
+  public void testPluginFilterWithSArg() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "B", "c", "dE", "e", "f"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, true);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "de");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 4L, 6L)
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f1"})
+      .allowSARGToFilter(true);
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table1/file1",
+                                                    conf);
+    assertTrue(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should be an AND Batch filter");
+    assertArrayEquals(new String[] {"f1", "f2"}, f.getColumnNames());
+    f.accept(fc.setBatch(batch));
+    validateSelected(3);
+  }
+
+  @Test
+  public void testPluginSelectsNone() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "B", "c", "dE", "e", "f"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, true);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "g");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 4L, 6L)
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f1"})
+      .allowSARGToFilter(true);
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table1/file1",
+                                                    conf);
+    assertTrue(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should be an AND Batch filter");
+    f.accept(fc.setBatch(batch));
+    validateNoneSelected();
+  }
+
+  @Test
+  public void testPluginDisabled() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "B", "c", "dE", "e", "f"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, false);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "g");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 4L, 6L)
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f1"})
+      .allowSARGToFilter(true);
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table1/file1",
+                                                    conf);
+    assertFalse(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should not be an AND Batch filter");
+    f.accept(fc.setBatch(batch));
+    validateSelected(1, 3, 5);
+  }
+
+  @Test
+  public void testPluginNonMatchingPath() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"a", "B", "c", "dE", "e", "f"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, true);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "g");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 4L, 6L)
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f1"})
+      .allowSARGToFilter(true);
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table2/file1",
+                                                    conf);
+    assertFalse(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should not be an AND Batch filter");
+    f.accept(fc.setBatch(batch));
+    validateSelected(1, 3, 5);
+  }
+
+  @Test
+  public void testPluginSelectsAll() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"abcdef", "Abcdef", "aBcdef", null, "abcDef", 
"abcdEf"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, true);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "abcdef");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f1", PredicateLeaf.Type.LONG, 2L, 4L, 6L)
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f1"})
+      .allowSARGToFilter(true);
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table1/file1",
+                                                    conf);
+    assertTrue(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should be an AND Batch filter");
+    f.accept(fc.setBatch(batch));
+    validateSelected(1, 5);
+  }
+
+  @Test
+  public void testPluginSameColumn() {
+    setBatch(new Long[] {1L, 2L, null, 4L, 5L, 6L},
+             new String[] {"abcdef", "Abcdef", "aBcdef", null, "abcDef", 
"abcdEf"});
+
+    // Define the plugin filter
+    Configuration conf = new Configuration();
+    OrcConf.ALLOW_PLUGIN_FILTER.setBoolean(conf, true);
+    conf.set("my.filter.name", "my_str_i_eq");
+    conf.set("my.filter.col.name", "f2");
+    conf.set("my.filter.col.value", "abcdef");
+    conf.set("my.filter.scope", "file://db/table1/.*");
+
+    SearchArgument sArg = SearchArgumentFactory.newBuilder()
+      .in("f2", PredicateLeaf.Type.STRING, "Abcdef", "abcdEf")
+      .build();
+
+    // Setup Options
+    Reader.Options opts = new Reader.Options(conf)
+      .searchArgument(sArg, new String[] {"f2"})
+      .allowSARGToFilter(true);
+
+    BatchFilter f = FilterFactory.createBatchFilter(opts,
+                                                    schema,
+                                                    false,
+                                                    OrcFile.Version.CURRENT,
+                                                    false,
+                                                    "file://db/table1/file1",
+                                                    conf);
+    assertTrue(f instanceof BatchFilterFactory.AndBatchFilterImpl,
+               "Filter should be an AND Batch filter");
+    assertArrayEquals(new String[] {"f2"}, f.getColumnNames());
+    f.accept(fc.setBatch(batch));
+    validateSelected(1, 5);
+  }
+}
diff --git 
a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java 
b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java
index 4184f02..e609829 100644
--- a/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java
+++ b/java/core/src/test/org/apache/orc/impl/filter/leaf/TestFilters.java
@@ -60,7 +60,8 @@ public class TestFilters extends ATestFilter {
                                               boolean normalize) {
     Reader.Options options = new Reader.Options().allowSARGToFilter(true);
     options.searchArgument(sArg, new String[0]);
-    return FilterFactory.createBatchFilter(options, readSchema, false, 
version, normalize);
+    return FilterFactory.createBatchFilter(options, readSchema, false,
+                                           version, normalize, null, null);
   }
 
   @Test
diff --git a/java/pom.xml b/java/pom.xml
index a5146c0..7175911 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -817,6 +817,12 @@
         <version>1.11.19</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>com.google.auto.service</groupId>
+        <artifactId>auto-service</artifactId>
+        <version>1.0</version>
+        <optional>true</optional>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 </project>

Reply via email to