Hi Sweta `doris.filter.query` is the configuration for RDDs. For DataFrames and SQL, DorisWriter can automatically push down predicates. That is, DataFrames can use `filter()`, and SQL can be filtered using `where `.
You can refer to: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala#L398 -- Best, Di Wu On 2025/12/15 07:40:48 Sweta Jain via dev wrote: > Hi Doris dev team, > > I’m observing that predicate pushdown is not working in the Doris Spark > connector. > > Environment > - Doris version: doris-2.1.6-rc04 > - Spark version: 3.5.2 > - Connector jars tested: spark-doris-connector-spark-3.5-25.1.0.jar, > 25.2.0.jar, 25.3.0-SNAPSHOT.jar > > Steps to reproduce > 1. Run the standalone Spark job below (replace FE_HOST:PORT and credentials > with any Doris FE endpoint that has demo.incremental_sales_int). > 2. The job sets `doris.request.read_sql=true`, `doris.filter.pushdown=true`, > and `doris.filter.query = 'id > 2 AND id <= 5'`. > > Test program (inline): > ```java > package com.example.doris; > > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > > public class DorisConnectorCheck { > private static final String FE_NODES = "FE_HOST:PORT"; // replace with > your FE > private static final String DORIS_USER = "username"; > private static final String DORIS_PASSWORD = "password"; > private static final String DATABASE = "demo"; > private static final String TABLE = "incremental_sales_int"; > private static final String FILTER_PREDICATE = "id > 2 AND id <= 5"; > > public static void main(String[] args) { > SparkSession spark = SparkSession.builder() > .appName("DorisConnectorCheck") > .master(System.getenv().getOrDefault("SPARK_MASTER", > "local[*]")) > .config("spark.sql.shuffle.partitions", "1") > .getOrCreate(); > > spark.sparkContext().setLogLevel("INFO"); > > try { > System.out.printf("Connecting to Doris table %s.%s via FE %s%n", > DATABASE, TABLE, FE_NODES); > System.out.printf("Predicate under test: %s%n", FILTER_PREDICATE); > > Dataset<Row> dataset = spark.read() > .format("doris") > .option("doris.table.identifier", DATABASE + "." + TABLE) > .option("doris.fenodes", FE_NODES) > .option("doris.user", DORIS_USER) > .option("doris.password", DORIS_PASSWORD) > .option("doris.request.read_sql", "true") > .option("doris.filter.pushdown", "true") > .option("doris.filter.query", FILTER_PREDICATE) > .load(); > > dataset.explain(true); > dataset.show(50, false); > System.out.printf("Row count returned to Spark: %d%n", > dataset.count()); > } finally { > spark.stop(); > } > } > } > ```* > > > Observed behavior (log excerpt): > > Connecting to Doris table demo.incremental_sales_int via FE FE_HOST:PORT > Predicate under test: id > 2 AND id <= 5 > 25/12/15 12:06:26 INFO ReaderPartitionGenerator: get query plan ..., sql: > SELECT `id`,`age` FROM `demo`.`incremental_sales_int` > == Physical Plan == > *(1) Project [id#0, age#1] > +- BatchScan demo.incremental_sales_int[id#0, age#1] > > +---+---+ > |id |age| > +---+---+ > |1 |12 | > |3 |15 | > |5 |21 | > |6 |34 | > |2 |32 | > |4 |18 | > +---+---+ > > Row count returned to Spark: 6 > > > Even though doris.filter.query is set, the FE plan never includes the > predicate; Spark receives all rows and filters locally. This makes > incremental reads impossible because every micro-batch becomes a full table > scan. > Expected result: the FE plan should include the filter and return only rows > with 2 < id <= 5. > Actual result: the FE ignores doris.filter.query, so Spark performs the > filtering after pulling every row. > Please let me know if you need additional traces or configs. I’m happy to run > more diagnostics. > Thanks, > Sweta Jain > Clear trail pvt ltd > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
