This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch issue-620-0.2 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit b0499eee97f88ced45e72875f6ae5107c64c5f63 Author: xiyu.zk <[email protected]> AuthorDate: Sat Jun 3 15:39:52 2023 +0800 [CELEBORN-620] Fix columnar shuffle codegen exception --- .../columnar/RssColumnarBatchCodeGenBuild.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/sql/execution/columnar/RssColumnarBatchCodeGenBuild.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/sql/execution/columnar/RssColumnarBatchCodeGenBuild.scala index b07a3db09..0f6d5c2fc 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/sql/execution/columnar/RssColumnarBatchCodeGenBuild.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/sql/execution/columnar/RssColumnarBatchCodeGenBuild.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.columnar +import java.io.ByteArrayOutputStream import java.nio.ByteBuffer import scala.collection.mutable @@ -57,13 +58,12 @@ class RssColumnarBatchCodeGenBuild { | } | | public byte[] buildColumnBytes() throws Exception { - | int offset = 0; - | byte[] giantBuffer = new byte[totalSize]; + | ${classOf[ByteArrayOutputStream].getName} giantBuffer = new ${classOf[ + ByteArrayOutputStream].getName}(); | byte[] rowCntBytes = int2ByteArray(rowCnt); - | System.arraycopy(rowCntBytes, 0, giantBuffer, offset, rowCntBytes.length); - | offset += 4; + | giantBuffer.write(rowCntBytes); | ${codes._3} - | return giantBuffer; + | return giantBuffer.toByteArray(); | } | | public void writeRow(InternalRow row) throws Exception { @@ -297,10 +297,8 @@ class RssColumnarBatchCodeGenBuild { | ${classOf[ByteBuffer].getName} buffers$index = b$index.build(); | byte[] bytes$index = ${classOf[JavaUtils].getName}.bufferToArray(buffers$index); | byte[] columnBuilderBytes$index = int2ByteArray(bytes$index.length); - | System.arraycopy(columnBuilderBytes$index, 0, giantBuffer, offset, columnBuilderBytes$index.length); - | offset += 4; - | System.arraycopy(bytes$index, 0, giantBuffer, offset, bytes$index.length); - | offset += bytes$index.length; + | giantBuffer.write(columnBuilderBytes$index); + | giantBuffer.write(bytes$index); """.stripMargin } }
