viirya commented on code in PR #213: URL: https://github.com/apache/arrow-datafusion-comet/pull/213#discussion_r1548991099
########## spark/src/main/scala/org/apache/spark/sql/comet/operators.scala: ########## @@ -77,18 +80,44 @@ abstract class CometExec extends CometPlan { */ def getByteArrayRdd(): RDD[(Long, ChunkedByteBuffer)] = { executeColumnar().mapPartitionsInternal { iter => + serializeBatches(iter) + } + } + + /** + * Serializes a list of `ColumnarBatch` into an output stream. + * + * @param batches + * the output batches, each batch is a list of Arrow vectors wrapped in `CometVector` + * @param out + * the output stream + */ + def serializeBatches(batches: Iterator[ColumnarBatch]): Iterator[(Long, ChunkedByteBuffer)] = { + val nativeUtil = new NativeUtil() + + batches.map { batch => val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate) val out = new DataOutputStream(codec.compressedOutputStream(cbbos)) - val count = new NativeUtil().serializeBatches(iter, out) + val (fieldVectors, batchProviderOpt) = nativeUtil.getBatchFieldVectors(batch) + val root = new VectorSchemaRoot(fieldVectors.asJava) + val provider = batchProviderOpt.getOrElse(nativeUtil.getDictionaryProvider) + + val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out)) + writer.start() + writer.writeBatch() + + root.clear() + writer.end() Review Comment: Previously `serializeBatches` is wrong which serializes all batches with a `ArrowStreamWriter`. It causes wrong results when serializing dictionary arrays, i.e., #241. Each batch could have different dictionary provider content. But when `ArrowStreamWriter` starts to serialize, it writes out dictionaries at the beginning. So later batch will use incorrect dictionary value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org