viirya commented on code in PR #213:
URL: 
https://github.com/apache/arrow-datafusion-comet/pull/213#discussion_r1548991099


##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -77,18 +80,44 @@ abstract class CometExec extends CometPlan {
    */
   def getByteArrayRdd(): RDD[(Long, ChunkedByteBuffer)] = {
     executeColumnar().mapPartitionsInternal { iter =>
+      serializeBatches(iter)
+    }
+  }
+
+  /**
+   * Serializes a list of `ColumnarBatch` into an output stream.
+   *
+   * @param batches
+   *   the output batches, each batch is a list of Arrow vectors wrapped in 
`CometVector`
+   * @param out
+   *   the output stream
+   */
+  def serializeBatches(batches: Iterator[ColumnarBatch]): Iterator[(Long, 
ChunkedByteBuffer)] = {
+    val nativeUtil = new NativeUtil()
+
+    batches.map { batch =>
       val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
       val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, 
ByteBuffer.allocate)
       val out = new DataOutputStream(codec.compressedOutputStream(cbbos))
 
-      val count = new NativeUtil().serializeBatches(iter, out)
+      val (fieldVectors, batchProviderOpt) = 
nativeUtil.getBatchFieldVectors(batch)
+      val root = new VectorSchemaRoot(fieldVectors.asJava)
+      val provider = 
batchProviderOpt.getOrElse(nativeUtil.getDictionaryProvider)
+
+      val writer = new ArrowStreamWriter(root, provider, 
Channels.newChannel(out))
+      writer.start()
+      writer.writeBatch()
+
+      root.clear()
+      writer.end()

Review Comment:
   Previously `serializeBatches` is wrong which serializes all batches with a 
`ArrowStreamWriter`. It causes wrong results when serializing dictionary 
arrays, i.e., #241.
   
   Each batch could have different dictionary provider content. But when 
`ArrowStreamWriter` starts to serialize, it writes out dictionaries at the 
beginning. So later batch will use incorrect dictionary value.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to