This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-mongodb.git


The following commit(s) were added to refs/heads/main by this push:
     new 424d57a  [FLINK-34216][connectors/mongodb] FLIP-377: Support 
fine-grained configuration to control filter push down for MongoDB Connector 
(#23)
424d57a is described below

commit 424d57a54c69d9d0e0fde4219e46bf5fcf457b59
Author: Jiabao Sun <[email protected]>
AuthorDate: Tue Aug 6 09:41:39 2024 +0800

    [FLINK-34216][connectors/mongodb] FLIP-377: Support fine-grained 
configuration to control filter push down for MongoDB Connector (#23)
---
 docs/content.zh/docs/connectors/table/mongodb.md   | 13 ++++
 docs/content/docs/connectors/table/mongodb.md      | 14 +++++
 .../mongodb/table/FilterHandlingPolicy.java        | 33 ++++++++++
 .../mongodb/table/MongoConnectorOptions.java       |  7 +++
 .../mongodb/table/MongoDynamicTableFactory.java    |  3 +
 .../mongodb/table/MongoDynamicTableSource.java     | 57 ++++++++++-------
 .../mongodb/table/config/MongoConfiguration.java   |  6 ++
 .../table/MongoDynamicTableFactoryTest.java        |  5 ++
 .../table/MongoDynamicTableSourceITCase.java       | 11 +++-
 .../mongodb/table/MongoTablePlanTest.java          | 71 +++++++++++++++-------
 .../connector/mongodb/table/MongoTablePlanTest.xml | 18 ++++++
 11 files changed, 193 insertions(+), 45 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/mongodb.md 
b/docs/content.zh/docs/connectors/table/mongodb.md
index dde8363..1c078f0 100644
--- a/docs/content.zh/docs/connectors/table/mongodb.md
+++ b/docs/content.zh/docs/connectors/table/mongodb.md
@@ -231,6 +231,19 @@ ON myTopic.key = MyUserTable._id;
       <td>Duration</td>
       <td>查询数据库失败的最大重试时间。</td>
     </tr>
+    <tr>
+      <td><h5>filter.handling.policy</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">always</td>
+      <td>枚举值,可选项: always, never</td>
+      <td>过滤器下推策略,支持的策略有:
+          <ul>
+            <li><code>always</code>: 始终将支持的过滤器下推到数据库.</li>
+            <li><code>never</code>: 不将任何过滤器下推到数据库.</li>
+          </ul>
+      </td>
+    </tr>
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>可选</td>
diff --git a/docs/content/docs/connectors/table/mongodb.md 
b/docs/content/docs/connectors/table/mongodb.md
index 340a925..8fcd35c 100644
--- a/docs/content/docs/connectors/table/mongodb.md
+++ b/docs/content/docs/connectors/table/mongodb.md
@@ -236,6 +236,20 @@ Connector Options
       <td>Duration</td>
       <td>Specifies the retry time interval if lookup records from database 
failed.</td>
     </tr>
+    <tr>
+      <td><h5>filter.handling.policy</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">always</td>
+      <td>Enum Possible values: always, never</td>
+      <td>Fine-grained configuration to control filter push down. 
+          Supported policies are:
+          <ul>
+            <li><code>always</code>: Always push the supported filters to 
MongoDB.</li>
+            <li><code>never</code>: Never push any filters to MongoDB.</li>
+          </ul>
+      </td>
+    </tr>
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
new file mode 100644
index 0000000..7e12e02
--- /dev/null
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/FilterHandlingPolicy.java
@@ -0,0 +1,33 @@
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Fine-grained configuration to control filter push down for MongoDB 
Table/SQL source. */
+@PublicEvolving
+public enum FilterHandlingPolicy implements DescribedEnum {
+    ALWAYS("always", text("Always push the supported filters to MongoDB.")),
+
+    NEVER("never", text("Never push any filters to MongoDB."));
+
+    private final String name;
+    private final InlineElement description;
+
+    FilterHandlingPolicy(String name, InlineElement description) {
+        this.name = name;
+        this.description = description;
+    }
+
+    @Override
+    public InlineElement getDescription() {
+        return description;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
index a91c489..a579eaa 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoConnectorOptions.java
@@ -139,4 +139,11 @@ public class MongoConnectorOptions {
                     .defaultValue(Duration.ofMillis(1000L))
                     .withDescription(
                             "Specifies the retry time interval if writing 
records to database failed.");
+
+    public static final ConfigOption<FilterHandlingPolicy> 
FILTER_HANDLING_POLICY =
+            ConfigOptions.key("filter.handling.policy")
+                    .enumType(FilterHandlingPolicy.class)
+                    .defaultValue(FilterHandlingPolicy.ALWAYS)
+                    .withDescription(
+                            "Fine-grained configuration to control filter push 
down for MongoDB Table/SQL source.");
 }
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
index 59b8ba5..055c6b7 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java
@@ -43,6 +43,7 @@ import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -99,6 +100,7 @@ public class MongoDynamicTableFactory
         optionalOptions.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
         optionalOptions.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
         optionalOptions.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+        optionalOptions.add(FILTER_HANDLING_POLICY);
         return optionalOptions;
     }
 
@@ -132,6 +134,7 @@ public class MongoDynamicTableFactory
                 getLookupCache(options),
                 config.getLookupMaxRetries(),
                 config.getLookupRetryIntervalMs(),
+                config.getFilterHandlingPolicy(),
                 context.getPhysicalRowDataType());
     }
 
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
index a20f99a..229fb17 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -74,6 +75,7 @@ public class MongoDynamicTableSource
     @Nullable private final LookupCache lookupCache;
     private final int lookupMaxRetries;
     private final long lookupRetryIntervalMs;
+    private final FilterHandlingPolicy filterHandlingPolicy;
     private DataType producedDataType;
     private int limit = -1;
 
@@ -85,6 +87,7 @@ public class MongoDynamicTableSource
             @Nullable LookupCache lookupCache,
             int lookupMaxRetries,
             long lookupRetryIntervalMs,
+            FilterHandlingPolicy filterHandlingPolicy,
             DataType producedDataType) {
         this.connectionOptions = connectionOptions;
         this.readOptions = readOptions;
@@ -99,6 +102,7 @@ public class MongoDynamicTableSource
                 String.format("The '%s' must be larger than 0.", 
LOOKUP_RETRY_INTERVAL.key()));
         this.lookupMaxRetries = lookupMaxRetries;
         this.lookupRetryIntervalMs = lookupRetryIntervalMs;
+        this.filterHandlingPolicy = filterHandlingPolicy;
         this.producedDataType = producedDataType;
     }
 
@@ -170,6 +174,7 @@ public class MongoDynamicTableSource
                         lookupCache,
                         lookupMaxRetries,
                         lookupRetryIntervalMs,
+                        filterHandlingPolicy,
                         producedDataType);
         newSource.filter = BsonDocument.parse(filter.toJson());
         return newSource;
@@ -198,28 +203,36 @@ public class MongoDynamicTableSource
 
     @Override
     public Result applyFilters(List<ResolvedExpression> filters) {
-        List<ResolvedExpression> acceptedFilters = new ArrayList<>();
-        List<ResolvedExpression> remainingFilters = new ArrayList<>();
+        switch (filterHandlingPolicy) {
+            case NEVER:
+                return Result.of(Collections.emptyList(), filters);
+            case ALWAYS:
+            default:
+                List<ResolvedExpression> acceptedFilters = new ArrayList<>();
+                List<ResolvedExpression> remainingFilters = new ArrayList<>();
 
-        List<Bson> mongoFilters = new ArrayList<>();
-        for (ResolvedExpression filter : filters) {
-            BsonDocument simpleFilter = parseFilter(filter);
-            if (simpleFilter.isEmpty()) {
-                remainingFilters.add(filter);
-            } else {
-                acceptedFilters.add(filter);
-                mongoFilters.add(simpleFilter);
-            }
-        }
+                List<Bson> mongoFilters = new ArrayList<>();
+                for (ResolvedExpression filter : filters) {
+                    BsonDocument simpleFilter = parseFilter(filter);
+                    if (simpleFilter.isEmpty()) {
+                        remainingFilters.add(filter);
+                    } else {
+                        acceptedFilters.add(filter);
+                        mongoFilters.add(simpleFilter);
+                    }
+                }
 
-        if (!mongoFilters.isEmpty()) {
-            Bson mergedFilter =
-                    mongoFilters.size() == 1 ? mongoFilters.get(0) : 
Filters.and(mongoFilters);
-            this.filter = mergedFilter.toBsonDocument();
-            LOG.info("Pushed down filters: {}", filter.toJson());
-        }
+                if (!mongoFilters.isEmpty()) {
+                    Bson mergedFilter =
+                            mongoFilters.size() == 1
+                                    ? mongoFilters.get(0)
+                                    : Filters.and(mongoFilters);
+                    this.filter = mergedFilter.toBsonDocument();
+                    LOG.info("Pushed down filters: {}", filter.toJson());
+                }
 
-        return Result.of(acceptedFilters, remainingFilters);
+                return Result.of(acceptedFilters, remainingFilters);
+        }
     }
 
     static BsonDocument parseFilter(ResolvedExpression filter) {
@@ -244,7 +257,8 @@ public class MongoDynamicTableSource
                 && Objects.equals(filter, that.filter)
                 && Objects.equals(lookupCache, that.lookupCache)
                 && Objects.equals(lookupMaxRetries, that.lookupMaxRetries)
-                && Objects.equals(lookupRetryIntervalMs, 
that.lookupRetryIntervalMs);
+                && Objects.equals(lookupRetryIntervalMs, 
that.lookupRetryIntervalMs)
+                && Objects.equals(filterHandlingPolicy, 
that.filterHandlingPolicy);
     }
 
     @Override
@@ -257,6 +271,7 @@ public class MongoDynamicTableSource
                 filter,
                 lookupCache,
                 lookupMaxRetries,
-                lookupRetryIntervalMs);
+                lookupRetryIntervalMs,
+                filterHandlingPolicy);
     }
 }
diff --git 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
index c141ca2..753b429 100644
--- 
a/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
+++ 
b/flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/config/MongoConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import 
org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
+import org.apache.flink.connector.mongodb.table.FilterHandlingPolicy;
 import org.apache.flink.table.connector.source.lookup.LookupOptions;
 
 import javax.annotation.Nullable;
@@ -33,6 +34,7 @@ import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -97,6 +99,10 @@ public class MongoConfiguration {
         return config.get(LOOKUP_RETRY_INTERVAL).toMillis();
     }
 
+    public FilterHandlingPolicy getFilterHandlingPolicy() {
+        return config.get(FILTER_HANDLING_POLICY);
+    }
+
     // -----------------------------------Write 
Config------------------------------------------
     public int getBufferFlushMaxRows() {
         return config.get(BUFFER_FLUSH_MAX_ROWS);
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
index c9af0fc..3dae596 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactoryTest.java
@@ -46,6 +46,7 @@ import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.BUF
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DELIVERY_GUARANTEE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.LOOKUP_RETRY_INTERVAL;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_CURSOR_NO_TIMEOUT;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.SCAN_FETCH_SIZE;
@@ -89,6 +90,7 @@ public class MongoDynamicTableFactoryTest {
                         null,
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+                        FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
         assertThat(actualSource).isEqualTo(expectedSource);
     }
@@ -115,6 +117,7 @@ public class MongoDynamicTableFactoryTest {
         properties.put(SCAN_PARTITION_STRATEGY.key(), "split-vector");
         properties.put(SCAN_PARTITION_SIZE.key(), "128m");
         properties.put(SCAN_PARTITION_SAMPLES.key(), "5");
+        properties.put(FILTER_HANDLING_POLICY.key(), "never");
 
         DynamicTableSource actual = createTableSource(SCHEMA, properties);
 
@@ -135,6 +138,7 @@ public class MongoDynamicTableFactoryTest {
                         null,
                         LookupOptions.MAX_RETRIES.defaultValue(),
                         LOOKUP_RETRY_INTERVAL.defaultValue().toMillis(),
+                        FilterHandlingPolicy.NEVER,
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
@@ -162,6 +166,7 @@ public class MongoDynamicTableFactoryTest {
                         
DefaultLookupCache.fromConfig(Configuration.fromMap(properties)),
                         10,
                         20,
+                        FILTER_HANDLING_POLICY.defaultValue(),
                         SCHEMA.toPhysicalRowDataType());
 
         assertThat(actual).isEqualTo(expected);
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
index 74e2de1..062b374 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.FILTER_HANDLING_POLICY;
 import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -241,9 +242,13 @@ class MongoDynamicTableSourceITCase {
         }
     }
 
-    @Test
-    void testFilter() {
-        tEnv.executeSql(createTestDDl(null));
+    @ParameterizedTest
+    @EnumSource(FilterHandlingPolicy.class)
+    void testFilter(FilterHandlingPolicy filterHandlingPolicy) {
+        tEnv.executeSql(
+                createTestDDl(
+                        Collections.singletonMap(
+                                FILTER_HANDLING_POLICY.key(), 
filterHandlingPolicy.name())));
 
         // we create a VIEW here to test column remapping, i.e. would filter 
push down work if we
         // create a view that depends on our source table
diff --git 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
index 133cbe7..4636d60 100644
--- 
a/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
+++ 
b/flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.java
@@ -17,7 +17,10 @@
 
 package org.apache.flink.connector.mongodb.table;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -28,55 +31,81 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.rules.TestName;
 
 import java.time.ZoneId;
+import java.util.Collections;
+import java.util.Map;
 
 /** Plan tests for Mongo connector, for example, testing projection push down. 
*/
-public class MongoTablePlanTest extends TableTestBase {
+class MongoTablePlanTest extends TableTestBase {
 
     private final StreamTableTestUtil util = 
streamTestUtil(TableConfig.getDefault());
 
     private TestInfo testInfo;
 
     @BeforeEach
-    public void setup(TestInfo testInfo) {
+    void setup(TestInfo testInfo) {
         this.testInfo = testInfo;
         TableEnvironment tEnv = util.tableEnv();
         tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
-        tEnv.executeSql(
-                "CREATE TABLE mongo ("
-                        + "id BIGINT,"
-                        + "description VARCHAR(200),"
-                        + "boolean_col BOOLEAN,"
-                        + "timestamp_col TIMESTAMP_LTZ(0),"
-                        + "timestamp3_col TIMESTAMP_LTZ(3),"
-                        + "int_col INTEGER,"
-                        + "double_col DOUBLE,"
-                        + "decimal_col DECIMAL(10, 4)"
-                        + ") WITH ("
-                        + "  'connector'='mongodb',"
-                        + "  'uri'='mongodb://127.0.0.1:27017',"
-                        + "  'database'='test_db',"
-                        + "  'collection'='test_coll'"
-                        + ")");
     }
 
     @Test
-    public void testFilterPushdown() {
+    void testFilterPushdown() {
+        createTestTable();
         util.verifyExecPlan(
                 "SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 
900001 AND timestamp3_col <> TIMESTAMP '2022-09-07 10:25:28.127' OR double_col 
>= -1000.23");
     }
 
     @Test
-    public void testFilterPartialPushdown() {
+    void testFilterPartialPushdown() {
+        createTestTable();
         util.verifyExecPlan(
                 "SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 
900001 AND boolean_col = (decimal_col > 2.0)");
     }
 
     @Test
-    public void testFilterCannotPushdown() {
+    void testFilterCannotPushdown() {
+        createTestTable();
         util.verifyExecPlan(
                 "SELECT id, timestamp3_col, int_col FROM mongo WHERE id IS NOT 
NULL OR double_col = decimal_col");
     }
 
+    @Test
+    void testNeverFilterPushdown() {
+        createTestTable(
+                Collections.singletonMap(
+                        MongoConnectorOptions.FILTER_HANDLING_POLICY.key(),
+                        FilterHandlingPolicy.NEVER.name()));
+        util.verifyExecPlan(
+                "SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 
900001 AND decimal_col > 1.0");
+    }
+
+    private void createTestTable() {
+        createTestTable(Collections.emptyMap());
+    }
+
+    private void createTestTable(Map<String, String> extraOptions) {
+        TableDescriptor.Builder builder =
+                TableDescriptor.forConnector("mongodb")
+                        .option("uri", "mongodb://127.0.0.1:27017")
+                        .option("database", "test_db")
+                        .option("collection", "test_coll")
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("id", DataTypes.BIGINT())
+                                        .column("description", 
DataTypes.VARCHAR(200))
+                                        .column("boolean_col", 
DataTypes.BOOLEAN())
+                                        .column("timestamp_col", 
DataTypes.TIMESTAMP_LTZ(0))
+                                        .column("timestamp3_col", 
DataTypes.TIMESTAMP_LTZ(3))
+                                        .column("int_col", DataTypes.INT())
+                                        .column("double_col", 
DataTypes.DOUBLE())
+                                        .column("decimal_col", 
DataTypes.DECIMAL(10, 4))
+                                        .build());
+
+        extraOptions.forEach(builder::option);
+
+        util.tableEnv().createTable("mongo", builder.build());
+    }
+
     // A workaround to get the test method name for flink versions not 
completely migrated to JUnit5
     public TestName name() {
         return new TestName() {
diff --git 
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
 
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
index b2c57e4..b40cb1f 100644
--- 
a/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
+++ 
b/flink-connector-mongodb/src/test/resources/org/apache/flink/connector/mongodb/table/MongoTablePlanTest.xml
@@ -66,6 +66,24 @@ LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
       <![CDATA[
 Calc(select=[id, timestamp3_col, int_col], where=[(id IS NOT NULL OR 
(double_col = CAST(decimal_col AS DOUBLE)))])
 +- TableSourceScan(table=[[default_catalog, default_database, mongo, 
filter=[], project=[id, timestamp3_col, int_col, double_col, decimal_col]]], 
fields=[id, timestamp3_col, int_col, double_col, decimal_col])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testNeverFilterPushdown">
+    <Resource name="sql">
+      <![CDATA[SELECT id, timestamp3_col, int_col FROM mongo WHERE id = 900001 
AND decimal_col > 1.0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], timestamp3_col=[$4], int_col=[$5])
++- LogicalFilter(condition=[AND(=($0, 900001), >($7, 1.0:DECIMAL(2, 1)))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, mongo]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[CAST(900001 AS BIGINT) AS id, timestamp3_col, int_col], 
where=[((id = 900001) AND (decimal_col > 1.0))])
++- TableSourceScan(table=[[default_catalog, default_database, mongo, 
filter=[], project=[id, timestamp3_col, int_col, decimal_col]]], fields=[id, 
timestamp3_col, int_col, decimal_col])
 ]]>
     </Resource>
   </TestCase>

Reply via email to