zhztheplayer commented on code in PR #6009:
URL: https://github.com/apache/incubator-gluten/pull/6009#discussion_r1634080874


##########
cpp/core/jni/JniCommon.cc:
##########
@@ -65,3 +65,61 @@ gluten::Runtime* gluten::getRuntime(JNIEnv* env, jobject 
runtimeAware) {
   GLUTEN_CHECK(ctx != nullptr, "FATAL: resource instance should not be null.");
   return ctx;
 }
+
+std::unique_ptr<gluten::JniColumnarBatchIterator> 
gluten::makeJniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    gluten::Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer) {
+  return std::make_unique<JniColumnarBatchIterator>(env, jColumnarBatchItr, 
runtime, writer);
+}
+
+gluten::JniColumnarBatchIterator::JniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    gluten::Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer)
+    : runtime_(runtime), writer_(writer) {
+  // IMPORTANT: DO NOT USE LOCAL REF IN DIFFERENT THREAD
+  if (env->GetJavaVM(&vm_) != JNI_OK) {
+    std::string errorMessage = "Unable to get JavaVM instance";
+    throw gluten::GlutenException(errorMessage);
+  }
+  serializedColumnarBatchIteratorClass_ =
+      createGlobalClassReferenceOrError(env, 
"Lorg/apache/gluten/vectorized/ColumnarBatchInIterator;");
+  serializedColumnarBatchIteratorHasNext_ =
+      getMethodIdOrError(env, serializedColumnarBatchIteratorClass_, 
"hasNext", "()Z");
+  serializedColumnarBatchIteratorNext_ = getMethodIdOrError(env, 
serializedColumnarBatchIteratorClass_, "next", "()J");
+  jColumnarBatchItr_ = env->NewGlobalRef(jColumnarBatchItr);
+}
+
+gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
+  JNIEnv* env;
+  attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+  env->DeleteGlobalRef(jColumnarBatchItr_);
+  env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_);
+  vm_->DetachCurrentThread();
+}
+
+std::shared_ptr<gluten::ColumnarBatch> 
gluten::JniColumnarBatchIterator::next() {
+  JNIEnv* env;
+  attachCurrentThreadAsDaemonOrThrow(vm_, &env);
+  if (!env->CallBooleanMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorHasNext_)) {
+    checkException(env);
+    return nullptr; // stream ended
+  }
+
+  checkException(env);
+  jlong handle = env->CallLongMethod(jColumnarBatchItr_, 
serializedColumnarBatchIteratorNext_);
+  checkException(env);
+  auto batch = runtime_->objectStore()->retrieve<ColumnarBatch>(handle);
+  if (writer_ != nullptr) {
+    // save snapshot of the batch to file
+    std::shared_ptr<ArrowSchema> schema = batch->exportArrowSchema();
+    std::shared_ptr<ArrowArray> array = batch->exportArrowArray();
+    auto rb = gluten::arrowGetOrThrow(arrow::ImportRecordBatch(array.get(), 
schema.get()));
+    GLUTEN_THROW_NOT_OK(writer_->initWriter(*(rb->schema().get())));
+    GLUTEN_THROW_NOT_OK(writer_->writeInBatches(rb));
+  }
+  return batch;
+}

Review Comment:
   code movement



##########
cpp/core/jni/JniCommon.h:
##########
@@ -251,6 +258,40 @@ DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kLong, 
jlongArray, Long)
 DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kFloat, jfloatArray, Float)
 DEFINE_SAFE_GET_PRIMITIVE_ARRAY_FUNCTIONS(kDouble, jdoubleArray, Double)
 
+class JniColumnarBatchIterator : public ColumnarBatchIterator {
+ public:
+  explicit JniColumnarBatchIterator(
+      JNIEnv* env,
+      jobject jColumnarBatchItr,
+      Runtime* runtime,
+      std::shared_ptr<ArrowWriter> writer);
+
+  // singleton
+  JniColumnarBatchIterator(const JniColumnarBatchIterator&) = delete;
+  JniColumnarBatchIterator(JniColumnarBatchIterator&&) = delete;
+  JniColumnarBatchIterator& operator=(const JniColumnarBatchIterator&) = 
delete;
+  JniColumnarBatchIterator& operator=(JniColumnarBatchIterator&&) = delete;
+
+  virtual ~JniColumnarBatchIterator();
+
+  std::shared_ptr<ColumnarBatch> next() override;
+
+ private:
+  JavaVM* vm_;
+  jobject jColumnarBatchItr_;
+  Runtime* runtime_;
+  std::shared_ptr<ArrowWriter> writer_;
+
+  jclass serializedColumnarBatchIteratorClass_;
+  jmethodID serializedColumnarBatchIteratorHasNext_;
+  jmethodID serializedColumnarBatchIteratorNext_;
+};
+
+std::unique_ptr<JniColumnarBatchIterator> makeJniColumnarBatchIterator(
+    JNIEnv* env,
+    jobject jColumnarBatchItr,
+    Runtime* runtime,
+    std::shared_ptr<ArrowWriter> writer);

Review Comment:
   code movement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to