This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e4fe9baec [GLUTEN-5136][VL] Duplicated output from Spark-to-Velox
broadcast relation conversion (#5141)
e4fe9baec is described below
commit e4fe9baeccde07e2938d5f186151c43591e91720
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Mar 27 12:54:29 2024 +0800
[GLUTEN-5136][VL] Duplicated output from Spark-to-Velox broadcast relation
conversion (#5141)
---
.../apache/spark/sql/execution/BroadcastUtils.scala | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
index a0f28c5ab..ad7694ea2 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
BroadcastPartitioning, IdentityBroadcastMode, Partitioning}
-import org.apache.spark.sql.execution.joins.{HashedRelation,
HashedRelationBroadcastMode}
+import org.apache.spark.sql.execution.joins.{HashedRelation,
HashedRelationBroadcastMode, LongHashedRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.TaskResources
@@ -96,9 +96,8 @@ object BroadcastUtils {
// HashedRelation to ColumnarBuildSideRelation.
val fromBroadcast = from.asInstanceOf[Broadcast[HashedRelation]]
val fromRelation = fromBroadcast.value.asReadOnlyCopy()
- val keys = fromRelation.keys()
val toRelation = TaskResources.runUnsafe {
- val batchItr: Iterator[ColumnarBatch] = fn(keys.flatMap(key =>
fromRelation.get(key)))
+ val batchItr: Iterator[ColumnarBatch] =
fn(reconstructRows(fromRelation))
val serialized: Array[Array[Byte]] = serializeStream(batchItr) match
{
case ColumnarBatchSerializeResult.EMPTY =>
Array()
@@ -170,4 +169,17 @@ object BroadcastUtils {
}
serializeResult
}
+
+ private def reconstructRows(relation: HashedRelation): Iterator[InternalRow]
= {
+ // It seems that LongHashedRelation and UnsafeHashedRelation don't follow
the same
+ // criteria while getting values from them.
+ // Should review the internals of this part of code.
+ relation match {
+ case relation: LongHashedRelation if relation.keyIsUnique =>
+ relation.keys().map(k => relation.getValue(k))
+ case relation: LongHashedRelation if !relation.keyIsUnique =>
+ relation.keys().flatMap(k => relation.get(k))
+ case other => other.valuesWithKeyIndex().map(_.getValue)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]