[
https://issues.apache.org/jira/browse/SPARK-38288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496437#comment-17496437
]
Hyukjin Kwon commented on SPARK-38288:
--------------------------------------
cc [~beliefer] FYI
> Aggregate push down doesnt work using Spark SQL jdbc datasource with
> postgresql
> -------------------------------------------------------------------------------
>
> Key: SPARK-38288
> URL: https://issues.apache.org/jira/browse/SPARK-38288
> Project: Spark
> Issue Type: Question
> Components: SQL
> Affects Versions: 3.2.1
> Reporter: Luis Lozano Coira
> Priority: Major
> Labels: DataSource, Spark-SQL
>
> I am establishing a connection with postgresql using the Spark SQL jdbc
> datasource. I have started the spark shell including the postgres driver and
> I can connect and execute queries without problems. I am using this statement:
> {code:java}
> val df = spark.read.format("jdbc").option("url",
> "jdbc:postgresql://host:port/").option("driver",
> "org.postgresql.Driver").option("dbtable", "test").option("user",
> "postgres").option("password",
> "*******").option("pushDownAggregate",true).load()
> {code}
> I am adding the pushDownAggregate option because I would like the
> aggregations are delegated to the source. But for some reason this is not
> happening.
> Reviewing this pull request, it seems that this feature should be merged into
> 3.2. [https://github.com/apache/spark/pull/29695]
> I am making the aggregations considering the mentioned limitations. An
> example case where I don't see pushdown being done would be this one:
> {code:java}
> df.groupBy("name").max("age").show()
> {code}
> The results of the queryExecution are shown below:
> {code:java}
> scala> df.groupBy("name").max("age").queryExecution.executedPlan
> res19: org.apache.spark.sql.execution.SparkPlan =
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274,
> max(age)#544])
> +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
> +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)],
> output=[name#274, max#548])
> +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274]
> PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema:
> struct<age:int,name:string>
> scala> dfp.groupBy("name").max("age").queryExecution.toString
> res20: String =
> "== Parsed Logical Plan ==
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
> == Analyzed Logical Plan ==
> name: string, max(age): int
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#24...
> {code}
> What could be the problem? Should pushDownAggregate work in this case?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]