This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f72349ed8 [VL] Make ColumnarBatch::getRowBytes leak-safe (#6002)
f72349ed8 is described below

commit f72349ed8b18b40b45428a2c11bb658988c8e97c
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Jun 6 15:32:39 2024 +0800

    [VL] Make ColumnarBatch::getRowBytes leak-safe (#6002)
---
 cpp/core/jni/JniWrapper.cc             | 29 +++++++++++------------------
 cpp/core/memory/ColumnarBatch.cc       | 16 ++++++++--------
 cpp/core/memory/ColumnarBatch.h        |  9 +++++----
 cpp/velox/memory/VeloxColumnarBatch.cc | 10 +++++-----
 cpp/velox/memory/VeloxColumnarBatch.h  |  2 +-
 5 files changed, 30 insertions(+), 36 deletions(-)

diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index f5a6c4bd7..db498f43a 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -72,8 +72,8 @@ static jclass shuffleReaderMetricsClass;
 static jmethodID shuffleReaderMetricsSetDecompressTime;
 static jmethodID shuffleReaderMetricsSetDeserializeTime;
 
-static jclass block_stripes_class;
-static jmethodID block_stripes_constructor;
+static jclass blockStripesClass;
+static jmethodID blockStripesConstructor;
 
 class JavaInputStreamAdaptor final : public arrow::io::InputStream {
  public:
@@ -280,9 +280,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   shuffleReaderMetricsSetDeserializeTime =
       getMethodIdOrError(env, shuffleReaderMetricsClass, "setDeserializeTime", 
"(J)V");
 
-  block_stripes_class =
+  blockStripesClass =
       createGlobalClassReferenceOrError(env, 
"Lorg/apache/spark/sql/execution/datasources/BlockStripes;");
-  block_stripes_constructor = env->GetMethodID(block_stripes_class, "<init>", 
"(J[J[II[B)V");
+  blockStripesConstructor = env->GetMethodID(blockStripesClass, "<init>", 
"(J[J[II[B)V");
 
   return jniVersion;
 }
@@ -297,7 +297,7 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   env->DeleteGlobalRef(nativeColumnarToRowInfoClass);
   env->DeleteGlobalRef(byteArrayClass);
   env->DeleteGlobalRef(shuffleReaderMetricsClass);
-  env->DeleteGlobalRef(block_stripes_class);
+  env->DeleteGlobalRef(blockStripesClass);
 
   gluten::getJniErrorState()->close();
   gluten::getJniCommonState()->close();
@@ -1224,14 +1224,13 @@ 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_splitBlockByPartitionAndB
   }
 
   MemoryManager* memoryManager = 
reinterpret_cast<MemoryManager*>(memoryManagerId);
-  auto result = batch->getRowBytes(0);
-  auto rowBytes = result.first;
+  auto result = batch->toUnsafeRow(0);
+  auto rowBytes = result.data();
   auto newBatchHandle = ctx->objectStore()->save(ctx->select(memoryManager, 
batch, partitionColIndiceVec));
 
-  auto bytesSize = result.second;
+  auto bytesSize = result.size();
   jbyteArray bytesArray = env->NewByteArray(bytesSize);
   env->SetByteArrayRegion(bytesArray, 0, bytesSize, 
reinterpret_cast<jbyte*>(rowBytes));
-  delete[] rowBytes;
 
   jlongArray batchArray = env->NewLongArray(1);
   long* cBatchArray = new long[1];
@@ -1239,15 +1238,9 @@ 
Java_org_apache_gluten_datasource_DatasourceJniWrapper_splitBlockByPartitionAndB
   env->SetLongArrayRegion(batchArray, 0, 1, cBatchArray);
   delete[] cBatchArray;
 
-  jobject block_stripes = env->NewObject(
-      block_stripes_class,
-      block_stripes_constructor,
-      batchHandle,
-      batchArray,
-      nullptr,
-      batch->numColumns(),
-      bytesArray);
-  return block_stripes;
+  jobject blockStripes = env->NewObject(
+      blockStripesClass, blockStripesConstructor, batchHandle, batchArray, 
nullptr, batch->numColumns(), bytesArray);
+  return blockStripes;
   JNI_METHOD_END(nullptr)
 }
 
diff --git a/cpp/core/memory/ColumnarBatch.cc b/cpp/core/memory/ColumnarBatch.cc
index bb80510ee..23567535d 100644
--- a/cpp/core/memory/ColumnarBatch.cc
+++ b/cpp/core/memory/ColumnarBatch.cc
@@ -43,8 +43,8 @@ int64_t ColumnarBatch::getExportNanos() const {
   return exportNanos_;
 }
 
-std::pair<char*, int> ColumnarBatch::getRowBytes(int32_t rowId) const {
-  throw gluten::GlutenException("Not implemented getRowBytes for 
ColumnarBatch");
+std::vector<char> ColumnarBatch::toUnsafeRow(int32_t rowId) const {
+  throw gluten::GlutenException("Not implemented toUnsafeRow for 
ColumnarBatch");
 }
 
 std::ostream& operator<<(std::ostream& os, const ColumnarBatch& columnarBatch) 
{
@@ -86,8 +86,8 @@ std::shared_ptr<ArrowArray> 
ArrowColumnarBatch::exportArrowArray() {
   return cArray;
 }
 
-std::pair<char*, int> ArrowColumnarBatch::getRowBytes(int32_t rowId) const {
-  throw gluten::GlutenException("Not implemented getRowBytes for 
ArrowColumnarBatch");
+std::vector<char> ArrowColumnarBatch::toUnsafeRow(int32_t rowId) const {
+  throw gluten::GlutenException("#toUnsafeRow of ArrowColumnarBatch is not 
implemented");
 }
 
 ArrowCStructColumnarBatch::ArrowCStructColumnarBatch(
@@ -123,8 +123,8 @@ std::shared_ptr<ArrowArray> 
ArrowCStructColumnarBatch::exportArrowArray() {
   return cArray_;
 }
 
-std::pair<char*, int> ArrowCStructColumnarBatch::getRowBytes(int32_t rowId) 
const {
-  throw gluten::GlutenException("Not implemented getRowBytes for 
ArrowCStructColumnarBatch");
+std::vector<char> ArrowCStructColumnarBatch::toUnsafeRow(int32_t rowId) const {
+  throw gluten::GlutenException("#toUnsafeRow of ArrowCStructColumnarBatch is 
not implemented");
 }
 
 std::shared_ptr<ColumnarBatch> 
CompositeColumnarBatch::create(std::vector<std::shared_ptr<ColumnarBatch>> 
batches) {
@@ -171,8 +171,8 @@ const std::vector<std::shared_ptr<ColumnarBatch>>& 
CompositeColumnarBatch::getBa
   return batches_;
 }
 
-std::pair<char*, int> CompositeColumnarBatch::getRowBytes(int32_t rowId) const 
{
-  throw gluten::GlutenException("Not implemented getRowBytes for 
CompositeColumnarBatch");
+std::vector<char> CompositeColumnarBatch::toUnsafeRow(int32_t rowId) const {
+  throw gluten::GlutenException("#toUnsafeRow of CompositeColumnarBatch is not 
implemented");
 }
 
 CompositeColumnarBatch::CompositeColumnarBatch(
diff --git a/cpp/core/memory/ColumnarBatch.h b/cpp/core/memory/ColumnarBatch.h
index 4a7b34889..fd8189aa6 100644
--- a/cpp/core/memory/ColumnarBatch.h
+++ b/cpp/core/memory/ColumnarBatch.h
@@ -49,7 +49,8 @@ class ColumnarBatch {
 
   virtual int64_t getExportNanos() const;
 
-  virtual std::pair<char*, int> getRowBytes(int32_t rowId) const;
+  // Serializes one single row to byte array that can be accessed as 
Spark-compatible unsafe row.
+  virtual std::vector<char> toUnsafeRow(int32_t rowId) const;
 
   friend std::ostream& operator<<(std::ostream& os, const ColumnarBatch& 
columnarBatch);
 
@@ -75,7 +76,7 @@ class ArrowColumnarBatch final : public ColumnarBatch {
 
   std::shared_ptr<ArrowArray> exportArrowArray() override;
 
-  std::pair<char*, int> getRowBytes(int32_t rowId) const override;
+  std::vector<char> toUnsafeRow(int32_t rowId) const override;
 
  private:
   std::shared_ptr<arrow::RecordBatch> batch_;
@@ -95,7 +96,7 @@ class ArrowCStructColumnarBatch final : public ColumnarBatch {
 
   std::shared_ptr<ArrowArray> exportArrowArray() override;
 
-  std::pair<char*, int> getRowBytes(int32_t rowId) const override;
+  std::vector<char> toUnsafeRow(int32_t rowId) const override;
 
  private:
   std::shared_ptr<ArrowSchema> cSchema_ = std::make_shared<ArrowSchema>();
@@ -120,7 +121,7 @@ class CompositeColumnarBatch final : public ColumnarBatch {
 
   const std::vector<std::shared_ptr<ColumnarBatch>>& getBatches() const;
 
-  std::pair<char*, int> getRowBytes(int32_t rowId) const override;
+  std::vector<char> toUnsafeRow(int32_t rowId) const override;
 
  private:
   explicit CompositeColumnarBatch(
diff --git a/cpp/velox/memory/VeloxColumnarBatch.cc 
b/cpp/velox/memory/VeloxColumnarBatch.cc
index 83428707b..0d8db3127 100644
--- a/cpp/velox/memory/VeloxColumnarBatch.cc
+++ b/cpp/velox/memory/VeloxColumnarBatch.cc
@@ -143,13 +143,13 @@ std::shared_ptr<ColumnarBatch> VeloxColumnarBatch::select(
   return std::make_shared<VeloxColumnarBatch>(rowVector);
 }
 
-std::pair<char*, int> VeloxColumnarBatch::getRowBytes(int32_t rowId) const {
+std::vector<char> VeloxColumnarBatch::toUnsafeRow(int32_t rowId) const {
   auto fast = 
std::make_unique<facebook::velox::row::UnsafeRowFast>(rowVector_);
   auto size = fast->rowSize(rowId);
-  char* rowBytes = new char[size];
-  std::memset(rowBytes, 0, size);
-  fast->serialize(0, rowBytes);
-  return std::make_pair(rowBytes, size);
+  std::vector<char> bytes(size);
+  std::memset(bytes.data(), 0, bytes.size());
+  fast->serialize(0, bytes.data());
+  return bytes;
 }
 
 } // namespace gluten
diff --git a/cpp/velox/memory/VeloxColumnarBatch.h 
b/cpp/velox/memory/VeloxColumnarBatch.h
index c319b7977..6c79f2772 100644
--- a/cpp/velox/memory/VeloxColumnarBatch.h
+++ b/cpp/velox/memory/VeloxColumnarBatch.h
@@ -41,7 +41,7 @@ class VeloxColumnarBatch final : public ColumnarBatch {
 
   std::shared_ptr<ArrowSchema> exportArrowSchema() override;
   std::shared_ptr<ArrowArray> exportArrowArray() override;
-  std::pair<char*, int> getRowBytes(int32_t rowId) const override;
+  std::vector<char> toUnsafeRow(int32_t rowId) const override;
   std::shared_ptr<ColumnarBatch> select(facebook::velox::memory::MemoryPool* 
pool, std::vector<int32_t> columnIndices);
   facebook::velox::RowVectorPtr getRowVector() const;
   facebook::velox::RowVectorPtr getFlattenedRowVector();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to