This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb082a5e4 [spark] Fix group by partial partition of a multi partition 
table (#6375)
3bb082a5e4 is described below

commit 3bb082a5e47268c31cac63be42dd0b9d7ea4b9b9
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Oct 10 13:41:04 2025 +0800

    [spark] Fix group by partial partition of a multi partition table (#6375)
---
 .../paimon/spark/aggregate/LocalAggregator.scala   | 15 ++++++------
 .../paimon/spark/sql/PushDownAggregatesTest.scala  | 27 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
index fb401c78e1..ae62c37c4f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
@@ -24,7 +24,8 @@ import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.stats.SimpleStatsEvolutions
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.source.DataSplit
-import org.apache.paimon.utils.{InternalRowUtils, ProjectedRow}
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.ProjectedRow
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
@@ -40,7 +41,8 @@ class LocalAggregator(table: FileStoreTable) {
   private val partitionType = SparkTypeUtils.toPartitionType(table)
   private val groupByEvaluatorMap = new mutable.HashMap[InternalRow, 
Seq[AggFuncEvaluator[_]]]()
   private var requiredGroupByType: Seq[DataType] = _
-  private var requiredGroupByIndexMapping: Seq[Int] = _
+  private var requiredGroupByIndexMapping: Array[Int] = _
+  private var requiredGroupByPaimonType: RowType = _
   private var aggFuncEvaluatorGetter: () => Seq[AggFuncEvaluator[_]] = _
   private var isInitialized = false
   private lazy val simpleStatsEvolutions = {
@@ -78,15 +80,14 @@ class LocalAggregator(table: FileStoreTable) {
         partitionType.getFieldIndex(r.fieldNames().head)
     }
 
+    requiredGroupByPaimonType = 
partitionType.project(requiredGroupByIndexMapping)
+
     isInitialized = true
   }
 
   private def requiredGroupByRow(partitionRow: BinaryRow): InternalRow = {
-    val projectedRow =
-      
ProjectedRow.from(requiredGroupByIndexMapping.toArray).replaceRow(partitionRow)
-    // `ProjectedRow` does not support `hashCode`, so do a deep copy
-    val genericRow = InternalRowUtils.copyInternalRow(projectedRow, 
partitionType)
-    SparkInternalRow.create(partitionType).replace(genericRow)
+    val projectedRow = 
ProjectedRow.from(requiredGroupByIndexMapping).replaceRow(partitionRow)
+    SparkInternalRow.create(requiredGroupByPaimonType).replace(projectedRow)
   }
 
   def update(dataSplit: DataSplit): Unit = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
index a60d88aef9..af99a67b24 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala
@@ -267,4 +267,31 @@ class PushDownAggregatesTest extends PaimonSparkTestBase 
with AdaptiveSparkPlanH
           })
       })
   }
+
+  test("Push down aggregate: group by partial partition of a multi partition 
table") {
+    sql(s"""
+           |CREATE TABLE T (
+           |c1 STRING,
+           |c2 STRING,
+           |c3 STRING,
+           |c4 STRING,
+           |c5 DATE)
+           |PARTITIONED BY (c5, c1)
+           |TBLPROPERTIES ('primary-key' = 'c5, c1, c3')
+           |""".stripMargin)
+
+    sql("INSERT INTO T VALUES ('t1', 'k1', 'v1', 'r1', '2025-01-01')")
+    checkAnswer(
+      sql("SELECT COUNT(*) FROM T GROUP BY c1"),
+      Seq(Row(1))
+    )
+    checkAnswer(
+      sql("SELECT c1, COUNT(*) FROM T GROUP BY c1"),
+      Seq(Row("t1", 1))
+    )
+    checkAnswer(
+      sql("SELECT COUNT(*), c1 FROM T GROUP BY c1"),
+      Seq(Row(1, "t1"))
+    )
+  }
 }

Reply via email to