This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 302c017 [SPARK-38542][SQL] UnsafeHashedRelation should serialize
numKeys out
302c017 is described below
commit 302c01708ebd1ab8c9bdf1bf46be11ade4d44a3c
Author: mcdull-zhang <[email protected]>
AuthorDate: Wed Mar 16 14:17:18 2022 +0800
[SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
### What changes were proposed in this pull request?
UnsafeHashedRelation should serialize numKeys out
### Why are the changes needed?
One case I found was this:
We turned on ReusedExchange(BroadcastExchange), but the returned
UnsafeHashedRelation is missing numKeys.
The reason is that the current type of TorrentBroadcast._value is
SoftReference, so the UnsafeHashedRelation obtained by deserialization loses
numKeys, which will lead to incorrect calculation results.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added a line of assert to an existing unit test
Closes #35836 from mcdull-zhang/UnsafeHashed.
Authored-by: mcdull-zhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8476c8b846ffd2622a6bcf1accf9fa55ffbdc0db)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/sql/execution/joins/HashedRelation.scala | 4 +++-
.../org/apache/spark/sql/execution/joins/HashedRelationSuite.scala | 3 +++
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 80f2143..f37aea6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -197,7 +197,7 @@ private[execution] class ValueRowWithKeyIndex {
* A HashedRelation for UnsafeRow, which is backed BytesToBytesMap.
*
* It's serialized in the following format:
- * [number of keys]
+ * [number of keys] [number of fields]
* [size of key] [size of value] [key bytes] [bytes for value]
*/
private[joins] class UnsafeHashedRelation(
@@ -352,6 +352,7 @@ private[joins] class UnsafeHashedRelation(
writeInt: (Int) => Unit,
writeLong: (Long) => Unit,
writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = {
+ writeInt(numKeys)
writeInt(numFields)
// TODO: move these into BytesToBytesMap
writeLong(binaryMap.numKeys())
@@ -385,6 +386,7 @@ private[joins] class UnsafeHashedRelation(
readInt: () => Int,
readLong: () => Long,
readBuffer: (Array[Byte], Int, Int) => Unit): Unit = {
+ numKeys = readInt()
numFields = readInt()
resultRow = new UnsafeRow(numFields)
val nKeys = readLong()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 0a364ba..d1e8ebe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession {
assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
assert(hashed2.get(unsafeData(2)).toArray === data2)
+ // SPARK-38542: UnsafeHashedRelation should serialize numKeys out
+ assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1))
+
val os2 = new ByteArrayOutputStream()
val out2 = new ObjectOutputStream(os2)
hashed2.asInstanceOf[UnsafeHashedRelation].writeExternal(out2)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]