Repository: spark Updated Branches: refs/heads/master 3e831a269 -> 74335b310
[SPARK-5707] [SQL] fix serialization of generated projection Author: Davies Liu <dav...@databricks.com> Closes #7272 from davies/fix_projection and squashes the following commits: 075ef76 [Davies Liu] fix codegen with BroadcastHashJion Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74335b31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74335b31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74335b31 Branch: refs/heads/master Commit: 74335b31072951244967f878d8b766cd1bfc2ac6 Parents: 3e831a2 Author: Davies Liu <dav...@databricks.com> Authored: Wed Jul 8 10:43:00 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Jul 8 10:43:00 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala | 3 +-- .../org/apache/spark/sql/execution/joins/HashOuterJoin.scala | 2 +- .../org/apache/spark/sql/execution/joins/HashedRelation.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index 06c244f..ab757fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -79,8 +79,7 @@ case class BroadcastHashOuterJoin( // Note that we use .execute().collect() because we don't want to convert data to Scala types val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() // buildHashTable uses code-generated rows as keys, which are not serializable - val hashed = - buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output)) + val hashed = buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output)) sparkContext.broadcast(hashed) }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext) http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 3337451..0522ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -171,7 +171,7 @@ override def outputPartitioning: Partitioning = joinType match { var existingMatchList = hashTable.get(rowKey) if (existingMatchList == null) { existingMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, existingMatchList) + hashTable.put(rowKey.copy(), existingMatchList) } existingMatchList += currentRow.copy() http://git-wip-us.apache.org/repos/asf/spark/blob/74335b31/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index de062c7..6b51f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -125,7 +125,7 @@ private[joins] object HashedRelation { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { val newMatchList = new CompactBuffer[InternalRow]() - hashTable.put(rowKey, newMatchList) + hashTable.put(rowKey.copy(), newMatchList) newMatchList } else { keyIsUnique = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org