Hi Doris dev team,

I’m observing that predicate pushdown is not working in the Doris Spark 
connector.

Environment
- Doris version: doris-2.1.6-rc04
- Spark version: 3.5.2
- Connector jars tested: spark-doris-connector-spark-3.5-25.1.0.jar, 
25.2.0.jar, 25.3.0-SNAPSHOT.jar

Steps to reproduce
1. Run the standalone Spark job below (replace FE_HOST:PORT and credentials 
with any Doris FE endpoint that has demo.incremental_sales_int).
2. The job sets `doris.request.read_sql=true`, `doris.filter.pushdown=true`, 
and `doris.filter.query = 'id > 2 AND id <= 5'`.

Test program (inline):
```java
package com.example.doris;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DorisConnectorCheck {
    private static final String FE_NODES = "FE_HOST:PORT"; // replace with your 
FE
    private static final String DORIS_USER = "username";
    private static final String DORIS_PASSWORD = "password";
    private static final String DATABASE = "demo";
    private static final String TABLE = "incremental_sales_int";
    private static final String FILTER_PREDICATE = "id > 2 AND id <= 5";

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("DorisConnectorCheck")
                .master(System.getenv().getOrDefault("SPARK_MASTER", 
"local[*]"))
                .config("spark.sql.shuffle.partitions", "1")
                .getOrCreate();

        spark.sparkContext().setLogLevel("INFO");

        try {
            System.out.printf("Connecting to Doris table %s.%s via FE %s%n", 
DATABASE, TABLE, FE_NODES);
            System.out.printf("Predicate under test: %s%n", FILTER_PREDICATE);

            Dataset<Row> dataset = spark.read()
                    .format("doris")
                    .option("doris.table.identifier", DATABASE + "." + TABLE)
                    .option("doris.fenodes", FE_NODES)
                    .option("doris.user", DORIS_USER)
                    .option("doris.password", DORIS_PASSWORD)
                    .option("doris.request.read_sql", "true")
                    .option("doris.filter.pushdown", "true")
                    .option("doris.filter.query", FILTER_PREDICATE)
                    .load();

            dataset.explain(true);
            dataset.show(50, false);
            System.out.printf("Row count returned to Spark: %d%n", 
dataset.count());
        } finally {
            spark.stop();
        }
    }
}
```*


Observed behavior (log excerpt):

Connecting to Doris table demo.incremental_sales_int via FE FE_HOST:PORT
Predicate under test: id > 2 AND id <= 5
25/12/15 12:06:26 INFO ReaderPartitionGenerator: get query plan ..., sql: 
SELECT `id`,`age` FROM `demo`.`incremental_sales_int`
== Physical Plan ==
*(1) Project [id#0, age#1]
+- BatchScan demo.incremental_sales_int[id#0, age#1]

+---+---+
|id |age|
+---+---+
|1  |12 |
|3  |15 |
|5  |21 |
|6  |34 |
|2  |32 |
|4  |18 |
+---+---+

Row count returned to Spark: 6


Even though doris.filter.query is set, the FE plan never includes the 
predicate; Spark receives all rows and filters locally. This makes incremental 
reads impossible because every micro-batch becomes a full table scan.
Expected result: the FE plan should include the filter and return only rows 
with 2 < id <= 5.
Actual result: the FE ignores doris.filter.query, so Spark performs the 
filtering after pulling every row.
Please let me know if you need additional traces or configs. I’m happy to run 
more diagnostics.
Thanks,
Sweta Jain
Clear trail pvt ltd

Reply via email to