Hi Sweta
`doris.filter.query` is the configuration for RDDs. For DataFrames and SQL, 
DorisWriter can automatically push down predicates. That is, DataFrames can use 
`filter()`, and SQL can be filtered using `where `.

You can refer to:
https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala#L398

--
Best,
Di Wu

On 2025/12/15 07:40:48 Sweta Jain via dev wrote:
> Hi Doris dev team,
> 
> I’m observing that predicate pushdown is not working in the Doris Spark 
> connector.
> 
> Environment
> - Doris version: doris-2.1.6-rc04
> - Spark version: 3.5.2
> - Connector jars tested: spark-doris-connector-spark-3.5-25.1.0.jar, 
> 25.2.0.jar, 25.3.0-SNAPSHOT.jar
> 
> Steps to reproduce
> 1. Run the standalone Spark job below (replace FE_HOST:PORT and credentials 
> with any Doris FE endpoint that has demo.incremental_sales_int).
> 2. The job sets `doris.request.read_sql=true`, `doris.filter.pushdown=true`, 
> and `doris.filter.query = 'id > 2 AND id <= 5'`.
> 
> Test program (inline):
> ```java
> package com.example.doris;
> 
> import org.apache.spark.sql.Dataset;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.SparkSession;
> 
> public class DorisConnectorCheck {
>     private static final String FE_NODES = "FE_HOST:PORT"; // replace with 
> your FE
>     private static final String DORIS_USER = "username";
>     private static final String DORIS_PASSWORD = "password";
>     private static final String DATABASE = "demo";
>     private static final String TABLE = "incremental_sales_int";
>     private static final String FILTER_PREDICATE = "id > 2 AND id <= 5";
> 
>     public static void main(String[] args) {
>         SparkSession spark = SparkSession.builder()
>                 .appName("DorisConnectorCheck")
>                 .master(System.getenv().getOrDefault("SPARK_MASTER", 
> "local[*]"))
>                 .config("spark.sql.shuffle.partitions", "1")
>                 .getOrCreate();
> 
>         spark.sparkContext().setLogLevel("INFO");
> 
>         try {
>             System.out.printf("Connecting to Doris table %s.%s via FE %s%n", 
> DATABASE, TABLE, FE_NODES);
>             System.out.printf("Predicate under test: %s%n", FILTER_PREDICATE);
> 
>             Dataset<Row> dataset = spark.read()
>                     .format("doris")
>                     .option("doris.table.identifier", DATABASE + "." + TABLE)
>                     .option("doris.fenodes", FE_NODES)
>                     .option("doris.user", DORIS_USER)
>                     .option("doris.password", DORIS_PASSWORD)
>                     .option("doris.request.read_sql", "true")
>                     .option("doris.filter.pushdown", "true")
>                     .option("doris.filter.query", FILTER_PREDICATE)
>                     .load();
> 
>             dataset.explain(true);
>             dataset.show(50, false);
>             System.out.printf("Row count returned to Spark: %d%n", 
> dataset.count());
>         } finally {
>             spark.stop();
>         }
>     }
> }
> ```*
> 
> 
> Observed behavior (log excerpt):
> 
> Connecting to Doris table demo.incremental_sales_int via FE FE_HOST:PORT
> Predicate under test: id > 2 AND id <= 5
> 25/12/15 12:06:26 INFO ReaderPartitionGenerator: get query plan ..., sql: 
> SELECT `id`,`age` FROM `demo`.`incremental_sales_int`
> == Physical Plan ==
> *(1) Project [id#0, age#1]
> +- BatchScan demo.incremental_sales_int[id#0, age#1]
> 
> +---+---+
> |id |age|
> +---+---+
> |1  |12 |
> |3  |15 |
> |5  |21 |
> |6  |34 |
> |2  |32 |
> |4  |18 |
> +---+---+
> 
> Row count returned to Spark: 6
> 
> 
> Even though doris.filter.query is set, the FE plan never includes the 
> predicate; Spark receives all rows and filters locally. This makes 
> incremental reads impossible because every micro-batch becomes a full table 
> scan.
> Expected result: the FE plan should include the filter and return only rows 
> with 2 < id <= 5.
> Actual result: the FE ignores doris.filter.query, so Spark performs the 
> filtering after pulling every row.
> Please let me know if you need additional traces or configs. I’m happy to run 
> more diagnostics.
> Thanks,
> Sweta Jain
> Clear trail pvt ltd
> 
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to