This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 944d85900 [GLUTEN-5787][CH]Make pipeline and shuffle exit gracefully 
when tasks in executors are killed or interrupted (#5839)
944d85900 is described below

commit 944d859004a83617a463b7e1a17bac378d5a1d08
Author: 李扬 <[email protected]>
AuthorDate: Thu Jun 6 11:18:47 2024 +0800

    [GLUTEN-5787][CH]Make pipeline and shuffle exit gracefully when tasks in 
executors are killed or interrupted (#5839)
    
    What changes were proposed in this pull request?
    Changes:
    
    Clean code: remove useless JNIs and classes under cpp-ch
    Support cancel for all gluten processors. It was triggered when task is 
killed or shut down.
    Make sure offheap memory free does not throw exception. Ref: 
https://zhuanlan.zhihu.com/p/65454580
    (Fixes: #5787 #5823)
    
    How was this patch tested?
    Manual
---
 .../apache/gluten/vectorized/BatchIterator.java    |  11 ++
 .../vectorized/CHNativeExpressionEvaluator.java    |  10 +-
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  21 +++-
 .../execution/NativeFileScanColumnarRDD.scala      |  12 ++-
 .../metrics/GlutenClickHouseMetricsUTUtils.scala   |   4 +-
 cpp-ch/local-engine/Common/BlockIterator.cpp       |   6 ++
 cpp-ch/local-engine/Common/QueryContext.cpp        |   2 +-
 .../Operator/BlockCoalesceOperator.cpp             |  53 ----------
 .../local-engine/Operator/BlockCoalesceOperator.h  |  47 ---------
 .../Operator/BlocksBufferPoolTransform.cpp         |   2 +-
 cpp-ch/local-engine/Operator/EmptyProjectStep.cpp  |   2 +-
 cpp-ch/local-engine/Operator/ExpandTransform.cpp   |  12 ++-
 .../Operator/GraceMergingAggregatedStep.cpp        |   6 +-
 .../Operator/PartitionColumnFillingTransform.h     |   4 +-
 .../Operator/StreamingAggregatingStep.cpp          |   7 +-
 .../local-engine/Parser/SerializedPlanParser.cpp   |   7 ++
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |  15 +--
 .../local-engine/Storages/SourceFromJavaIter.cpp   |  15 +--
 .../SubstraitSource/SubstraitFileSource.cpp        |  23 ++++-
 .../Storages/SubstraitSource/SubstraitFileSource.h |  26 ++++-
 .../jni/ReservationListenerWrapper.cpp             |   7 ++
 .../local-engine/jni/ReservationListenerWrapper.h  |   2 +
 cpp-ch/local-engine/jni/jni_common.h               |  27 +++++
 cpp-ch/local-engine/jni/jni_error.h                |   3 +-
 cpp-ch/local-engine/local_engine_jni.cpp           | 112 ++-------------------
 25 files changed, 190 insertions(+), 246 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index 5698caf02..d674c6e90 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -23,9 +23,11 @@ import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class BatchIterator extends GeneralOutIterator {
   private final long handle;
+  private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
   public BatchIterator(long handle) {
     super();
@@ -46,6 +48,8 @@ public class BatchIterator extends GeneralOutIterator {
 
   private native void nativeClose(long nativeHandle);
 
+  private native void nativeCancel(long nativeHandle);
+
   private native IMetrics nativeFetchMetrics(long nativeHandle);
 
   @Override
@@ -76,4 +80,11 @@ public class BatchIterator extends GeneralOutIterator {
   public void closeInternal() {
     nativeClose(handle);
   }
+
+  // Used to cancel native pipeline execution when spark task is killed
+  public final void cancel() {
+    if (cancelled.compareAndSet(false, true)) {
+      nativeCancel(handle);
+    }
+  }
 }
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
index 0d307d231..b8b4138dc 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
@@ -83,7 +83,7 @@ public class CHNativeExpressionEvaluator {
 
   // Used by WholeStageTransform to create the native computing pipeline and
   // return a columnar result iterator.
-  public GeneralOutIterator createKernelWithBatchIterator(
+  public BatchIterator createKernelWithBatchIterator(
       byte[] wsPlan,
       byte[][] splitInfo,
       List<GeneralInIterator> iterList,
@@ -97,11 +97,11 @@ public class CHNativeExpressionEvaluator {
             iterList.toArray(new GeneralInIterator[0]),
             buildNativeConf(getNativeBackendConf()),
             materializeInput);
-    return createOutIterator(handle);
+    return createBatchIterator(handle);
   }
 
   // Only for UT.
-  public GeneralOutIterator createKernelWithBatchIterator(
+  public BatchIterator createKernelWithBatchIterator(
       long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> 
iterList) {
     long handle =
         jniWrapper.nativeCreateKernelWithIterator(
@@ -111,10 +111,10 @@ public class CHNativeExpressionEvaluator {
             iterList.toArray(new GeneralInIterator[0]),
             buildNativeConf(getNativeBackendConf()),
             false);
-    return createOutIterator(handle);
+    return createBatchIterator(handle);
   }
 
-  private GeneralOutIterator createOutIterator(long nativeHandle) {
+  private BatchIterator createBatchIterator(long nativeHandle) {
     return new BatchIterator(nativeHandle);
   }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index bc16c2d77..63f7eeb79 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.substrait.plan.PlanNode
 import org.apache.gluten.substrait.rel._
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.utils.LogLevelUtil
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator, GeneralInIterator}
 
 import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
 import org.apache.spark.affinity.CHAffinity
@@ -206,13 +206,19 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
     val splitInfoByteArray = inputPartition
       .asInstanceOf[GlutenPartition]
       .splitInfosByteArray
-    val resIter: GeneralOutIterator =
+    val resIter =
       transKernel.createKernelWithBatchIterator(
         inputPartition.plan,
         splitInfoByteArray,
         inBatchIters,
         false)
 
+    context.addTaskFailureListener(
+      (ctx, _) => {
+        if (ctx.isInterrupted()) {
+          resIter.cancel()
+        }
+      })
     context.addTaskCompletionListener[Unit](_ => resIter.close())
     val iter = new Iterator[Any] {
       private val inputMetrics = context.taskMetrics().inputMetrics
@@ -304,6 +310,7 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       }
     }
     var closed = false
+    val cancelled = false
 
     def close(): Unit = {
       closed = true
@@ -311,6 +318,16 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       // relationHolder.clear()
     }
 
+    def cancel(): Unit = {
+      nativeIterator.cancel()
+    }
+
+    context.addTaskFailureListener(
+      (ctx, _) => {
+        if (ctx.isInterrupted()) {
+          cancel()
+        }
+      })
     context.addTaskCompletionListener[Unit](_ => close())
     new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
index 624a4390d..af512934b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
CloseableCHColumnBatchIterator, GeneralInIterator}
 
 import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -47,7 +47,7 @@ class NativeFileScanColumnarRDD(
       .asInstanceOf[GlutenPartition]
       .splitInfosByteArray
 
-    val resIter: GeneralOutIterator = GlutenTimeMetric.millis(scanTime) {
+    val resIter = GlutenTimeMetric.millis(scanTime) {
       _ =>
         val transKernel = new CHNativeExpressionEvaluator()
         val inBatchIters = new util.ArrayList[GeneralInIterator]()
@@ -58,6 +58,14 @@ class NativeFileScanColumnarRDD(
           false
         )
     }
+    TaskContext
+      .get()
+      .addTaskFailureListener(
+        (ctx, _) => {
+          if (ctx.isInterrupted()) {
+            resIter.cancel()
+          }
+        })
     TaskContext.get().addTaskCompletionListener[Unit](_ => resIter.close())
     val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
       var scanTotalTime = 0L
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
index bc395ca88..ee0ad8039 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
 import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
 import org.apache.gluten.metrics.{MetricsUtil, NativeMetrics}
 import org.apache.gluten.utils.SubstraitPlanPrinterUtil
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, 
GeneralInIterator}
 
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -47,7 +47,7 @@ object GlutenClickHouseMetricsUTUtils {
 
     val transKernel = new CHNativeExpressionEvaluator()
     val mockMemoryAllocator = CHNativeMemoryAllocators.contextInstanceForUT()
-    val resIter: GeneralOutIterator = 
transKernel.createKernelWithBatchIterator(
+    val resIter = transKernel.createKernelWithBatchIterator(
       mockMemoryAllocator.getNativeInstanceId,
       substraitPlan.toByteArray,
       new Array[Array[Byte]](0),
diff --git a/cpp-ch/local-engine/Common/BlockIterator.cpp 
b/cpp-ch/local-engine/Common/BlockIterator.cpp
index 1a76f646b..464701893 100644
--- a/cpp-ch/local-engine/Common/BlockIterator.cpp
+++ b/cpp-ch/local-engine/Common/BlockIterator.cpp
@@ -34,24 +34,30 @@ void local_engine::BlockIterator::checkNextValid()
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Block iterator 
next should after hasNext");
     }
 }
+
 void BlockIterator::produce()
 {
     consumed = false;
 }
+
 void BlockIterator::consume()
 {
     consumed = true;
 }
+
 bool BlockIterator::isConsumed() const
 {
     return consumed;
 }
+
 DB::Block & BlockIterator::currentBlock()
 {
     return cached_block;
 }
+
 void BlockIterator::setCurrentBlock(DB::Block & block)
 {
     cached_block = block;
 }
+
 }
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp 
b/cpp-ch/local-engine/Common/QueryContext.cpp
index c659e6f34..f4d39c612 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -67,7 +67,7 @@ int64_t initializeQuery(ReservationListenerWrapperPtr 
listener)
         else
             listener->reserve(size);
     };
-    CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { 
listener->free(size); };
+    CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { 
listener->tryFree(size); };
     CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return 
listener->currentMemory(); };
     allocator_map.insert(allocator_id, allocator_context);
     return allocator_id;
diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp 
b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
deleted file mode 100644
index 756249e8a..000000000
--- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "BlockCoalesceOperator.h"
-#include <Core/Block.h>
-
-namespace local_engine
-{
-
-void BlockCoalesceOperator::mergeBlock(DB::Block & block)
-{
-    block_buffer.add(block, 0, static_cast<int>(block.rows()));
-}
-
-bool BlockCoalesceOperator::isFull()
-{
-    return block_buffer.size() >= buf_size;
-}
-
-DB::Block * BlockCoalesceOperator::releaseBlock()
-{
-    clearCache();
-    cached_block = new DB::Block(block_buffer.releaseColumns());
-    return cached_block;
-}
-
-BlockCoalesceOperator::~BlockCoalesceOperator()
-{
-    clearCache();
-}
-
-void BlockCoalesceOperator::clearCache()
-{
-    if (cached_block)
-    {
-        delete cached_block;
-        cached_block = nullptr;
-    }
-}
-}
diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h 
b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
deleted file mode 100644
index 2b67b40ce..000000000
--- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <Shuffle/ShuffleSplitter.h>
-
-namespace DB
-{
-class Block;
-}
-
-namespace local_engine
-{
-
-class BlockCoalesceOperator
-{
-public:
-    explicit BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { }
-    ~BlockCoalesceOperator();
-
-    void mergeBlock(DB::Block & block);
-    bool isFull();
-    DB::Block * releaseBlock();
-
-private:
-    void clearCache();
-
-    size_t buf_size;
-    ColumnsBuffer block_buffer;
-    DB::Block * cached_block = nullptr;
-
-};
-}
diff --git a/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp 
b/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
index 3427a81c6..16a5bd5d2 100644
--- a/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
+++ b/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
@@ -43,7 +43,7 @@ DB::IProcessor::Status BlocksBufferPoolTransform::prepare()
 {
     auto & output = outputs.front();
     auto & input = inputs.front();
-    if (output.isFinished())
+    if (output.isFinished() || isCancelled())
     {
         input.close();
         return Status::Finished;
diff --git a/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp 
b/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
index 58cb33e59..62991585f 100644
--- a/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
+++ b/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
@@ -39,7 +39,7 @@ public:
     {
         auto & output = outputs.front();
         auto & input = inputs.front();
-        if (output.isFinished())
+        if (output.isFinished() || isCancelled())
         {
             input.close();
             return Status::Finished;
diff --git a/cpp-ch/local-engine/Operator/ExpandTransform.cpp 
b/cpp-ch/local-engine/Operator/ExpandTransform.cpp
index d48d48439..106c38e2d 100644
--- a/cpp-ch/local-engine/Operator/ExpandTransform.cpp
+++ b/cpp-ch/local-engine/Operator/ExpandTransform.cpp
@@ -48,7 +48,7 @@ ExpandTransform::Status ExpandTransform::prepare()
     auto & output = outputs.front();
     auto & input = inputs.front();
 
-    if (output.isFinished())
+    if (output.isFinished() || isCancelled())
     {
         input.close();
         return Status::Finished;
@@ -79,12 +79,12 @@ ExpandTransform::Status ExpandTransform::prepare()
 
         if (!input.hasData())
             return Status::NeedData;
-        
+
         input_chunk = input.pull(true);
         has_input = true;
         expand_expr_iterator = 0;
     }
-    
+
     return Status::Ready;
 }
 
@@ -92,6 +92,7 @@ void ExpandTransform::work()
 {
     if (expand_expr_iterator >= project_set_exprs.getExpandRows())
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, 
"expand_expr_iterator >= project_set_exprs.getExpandRows()");
+
     const auto & original_cols = input_chunk.getColumns();
     size_t rows = input_chunk.getNumRows();
     DB::Columns cols;
@@ -139,8 +140,9 @@ void ExpandTransform::work()
         }
     }
     output_chunk = DB::Chunk(cols, rows);
-    expand_expr_iterator += 1;
-    has_output = expand_expr_iterator <= project_set_exprs.getExpandRows();
+    has_output = true;
+
+    ++expand_expr_iterator;
     has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
 }
 }
diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp 
b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
index ca86785fe..a9a2df276 100644
--- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
+++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
@@ -146,7 +146,7 @@ GraceMergingAggregatedTransform::Status 
GraceMergingAggregatedTransform::prepare
 {
     auto & output = outputs.front();
     auto & input = inputs.front();
-    if (output.isFinished())
+    if (output.isFinished() || isCancelled())
     {
         input.close();
         return Status::Finished;
@@ -224,7 +224,7 @@ void GraceMergingAggregatedTransform::work()
                 block_converter = 
prepareBucketOutputBlocks(current_bucket_index);
                 if (block_converter)
                     break;
-                current_bucket_index++;  
+                current_bucket_index++;
             }
         }
         if (!block_converter)
@@ -455,7 +455,7 @@ std::unique_ptr<AggregateDataBlockConverter> 
GraceMergingAggregatedTransform::pr
             block = {};
         }
     }
-    
+
     if (buffer_file_stream.original_file_stream)
     {
         buffer_file_stream.original_file_stream->finishWriting();
diff --git a/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h 
b/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
index 2eac10f39..692991b3f 100644
--- a/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
+++ b/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
@@ -25,9 +25,11 @@ class PartitionColumnFillingTransform : public 
DB::ISimpleTransform
 public:
     PartitionColumnFillingTransform(
         const DB::Block & input_, const DB::Block & output_, const String & 
partition_col_name_, const String & partition_col_value_);
-    void transform(DB::Chunk & chunk) override;
+
     String getName() const override { return 
"PartitionColumnFillingTransform"; }
 
+    void transform(DB::Chunk & chunk) override;
+
 private:
     DB::ColumnPtr createPartitionColumn();
 
diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp 
b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
index 698d353b1..65d77f8e9 100644
--- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
+++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
@@ -67,11 +67,12 @@ StreamingAggregatingTransform::Status 
StreamingAggregatingTransform::prepare()
 {
     auto & output = outputs.front();
     auto & input = inputs.front();
-    if (output.isFinished())
+    if (output.isFinished() || isCancelled())
     {
         input.close();
         return Status::Finished;
     }
+
     if (has_output)
     {
         if (output.canPush())
@@ -140,10 +141,10 @@ bool StreamingAggregatingTransform::needEvict()
 
     auto max_mem_used = 
static_cast<size_t>(context->getSettingsRef().max_memory_usage * 
max_allowed_memory_usage_ratio);
     auto current_result_rows = data_variants->size();
-    /// avoid evict empty or too small aggregated results. 
+    /// avoid evict empty or too small aggregated results.
     if (current_result_rows < aggregated_keys_before_evict)
         return false;
-    
+
     /// If the grouping keys is high cardinality, we should evict data 
variants early, and avoid to use a big
     /// hash table.
     if (static_cast<double>(total_output_rows)/total_input_rows > 
high_cardinality_threshold)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index c2bcd1a36..1c907035a 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2043,6 +2043,7 @@ LocalExecutor::~LocalExecutor()
 {
     if (context->getConfigRef().getBool("dump_pipeline", false))
         LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}", 
dumpPipeline());
+
     if (spark_buffer)
     {
         ch_column_to_spark_row->freeMem(spark_buffer->address, 
spark_buffer->size);
@@ -2166,6 +2167,12 @@ Block * LocalExecutor::nextColumnar()
     return columnar_batch;
 }
 
+void LocalExecutor::cancel()
+{
+    if (executor)
+        executor->cancel();
+}
+
 Block & LocalExecutor::getHeader()
 {
     return header;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 73448b069..d0a16ec71 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -402,21 +402,26 @@ class LocalExecutor : public BlockIterator
 public:
     LocalExecutor() = default;
     explicit LocalExecutor(ContextPtr context);
+    ~LocalExecutor();
+
     void execute(QueryPlanPtr query_plan);
     SparkRowInfoPtr next();
     Block * nextColumnar();
     bool hasNext();
-    ~LocalExecutor();
 
-    Block & getHeader();
+    /// Stop execution, used when task receives shutdown command or executor 
receives SIGTERM signal
+    void cancel();
 
+    Block & getHeader();
     RelMetricPtr getMetric() const { return metric; }
     void setMetric(RelMetricPtr metric_) { metric = metric_; }
-
     void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) { 
extra_plan_holder = std::move(extra_plan_holder_); }
-
 private:
     std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
+
+    /// Dump processor runtime information to log
+    std::string dumpPipeline();
+
     QueryPipeline query_pipeline;
     std::unique_ptr<PullingPipelineExecutor> executor;
     Block header;
@@ -427,8 +432,6 @@ private:
     RelMetricPtr metric;
     std::vector<QueryPlanPtr> extra_plan_holder;
 
-    /// Dump processor runtime information to log
-    std::string dumpPipeline();
 };
 
 
diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp 
b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
index 54d1d253e..37501e985 100644
--- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
+++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
@@ -53,12 +53,11 @@ static DB::Block getRealHeader(const DB::Block & header)
 DB::Block * SourceFromJavaIter::peekBlock(JNIEnv * env, jobject java_iter)
 {
     jboolean has_next = safeCallBooleanMethod(env, java_iter, 
serialized_record_batch_iterator_hasNext);
-    if (has_next)
-    {
-        jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env, 
java_iter, serialized_record_batch_iterator_next));
-        return reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
-    }
-    return nullptr;
+    if (!has_next)
+        return nullptr;
+
+    jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env, 
java_iter, serialized_record_batch_iterator_next));
+    return reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
 }
 
 
@@ -75,6 +74,9 @@ SourceFromJavaIter::SourceFromJavaIter(
 
 DB::Chunk SourceFromJavaIter::generate()
 {
+    if (isCancelled())
+        return {};
+
     GET_JNIENV(env)
     SCOPE_EXIT({CLEAN_JNIENV});
 
@@ -152,6 +154,7 @@ void SourceFromJavaIter::convertNullable(DB::Chunk & chunk)
     chunk.setColumns(columns, rows);
 }
 
+
 DB::ColumnPtr SourceFromJavaIter::convertNestedNullable(const DB::ColumnPtr & 
column, const DB::DataTypePtr & target_type)
 {
     DB::WhichDataType column_type(column->getDataType());
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index 80dccf759..5b872244e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -104,6 +104,9 @@ DB::Chunk SubstraitFileSource::generate()
 
 bool SubstraitFileSource::tryPrepareReader()
 {
+    if (isCancelled())
+        return false;
+
     if (file_reader)
         return true;
 
@@ -140,6 +143,13 @@ bool SubstraitFileSource::tryPrepareReader()
     return true;
 }
 
+
+void SubstraitFileSource::onCancel()
+{
+    if (file_reader)
+        file_reader->cancel();
+}
+
 DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type, 
const DB::Field & field, size_t rows)
 {
     auto nested_type = DB::removeNullable(data_type);
@@ -280,9 +290,13 @@ 
ConstColumnsFileReader::ConstColumnsFileReader(FormatFilePtr file_, DB::ContextP
     remained_rows = *rows;
 }
 
+
 bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
 {
-    if (!remained_rows) [[unlikely]]
+    if (isCancelled())
+        return false;
+
+    if (!remained_rows)
         return false;
 
     size_t to_read_rows = 0;
@@ -296,6 +310,7 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
         to_read_rows = block_size;
         remained_rows -= block_size;
     }
+
     DB::Columns res_columns;
     if (const size_t col_num = header.columns())
     {
@@ -307,8 +322,9 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
             auto type = col_with_name_and_type.type;
             const auto & name = col_with_name_and_type.name;
             auto it = partition_values.find(name);
-            if (it == partition_values.end()) [[unlikely]]
+            if (it == partition_values.end())
                 throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow 
partition column : {}", name);
+
             res_columns.emplace_back(createColumn(it->second, type, 
to_read_rows));
         }
     }
@@ -331,6 +347,9 @@ NormalFileReader::NormalFileReader(
 
 bool NormalFileReader::pull(DB::Chunk & chunk)
 {
+    if (isCancelled())
+        return false;
+
     DB::Chunk raw_chunk = input_format->input->generate();
     const size_t rows = raw_chunk.getNumRows();
     if (!rows)
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index 973f3cd35..650ec5d96 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -34,6 +34,15 @@ public:
     virtual ~FileReaderWrapper() = default;
     virtual bool pull(DB::Chunk & chunk) = 0;
 
+    void cancel()
+    {
+        bool already_cancelled = is_cancelled.exchange(true, 
std::memory_order_acq_rel);
+        if (!already_cancelled)
+            onCancel();
+    }
+
+    bool isCancelled() const { return 
is_cancelled.load(std::memory_order_acquire); }
+
     /// Apply key condition to the reader, if use_local_format is true, 
column_index_filter will be used
     /// otherwise it will be ignored
     virtual void applyKeyCondition(
@@ -42,7 +51,11 @@ public:
     }
 
 protected:
+    virtual void onCancel() {};
+
     FormatFilePtr file;
+    std::atomic<bool> is_cancelled{false};
+
 
     static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const 
DB::Field & field, size_t rows);
     static DB::ColumnPtr createColumn(const String & value, DB::DataTypePtr 
type, size_t rows);
@@ -68,10 +81,14 @@ public:
     }
 
 private:
+    void onCancel() override
+    {
+        input_format->input->cancel();
+    }
+
     DB::ContextPtr context;
     DB::Block to_read_header;
     DB::Block output_header;
-
     FormatFile::InputFormatPtr input_format;
 };
 
@@ -89,6 +106,7 @@ public:
     ConstColumnsFileReader(
         FormatFilePtr file_, DB::ContextPtr context_, const DB::Block & 
header_, size_t block_size_ = DB::DEFAULT_BLOCK_SIZE);
     ~ConstColumnsFileReader() override = default;
+
     bool pull(DB::Chunk & chunk) override;
 
 private:
@@ -112,6 +130,9 @@ protected:
     DB::Chunk generate() override;
 
 private:
+    bool tryPrepareReader();
+    void onCancel() override;
+
     DB::ContextPtr context;
     DB::Block output_header; /// Sample header may contains partitions keys
     DB::Block to_read_header; // Sample header not include partition keys
@@ -120,9 +141,6 @@ private:
     UInt32 current_file_index = 0;
     std::unique_ptr<FileReaderWrapper> file_reader;
     ReadBufferBuilderPtr read_buffer_builder;
-
     ColumnIndexFilterPtr column_index_filter;
-
-    bool tryPrepareReader();
 };
 }
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp 
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
index 65b29c2a2..dad0ecf66 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
@@ -58,6 +58,13 @@ void ReservationListenerWrapper::free(int64_t size)
     CLEAN_JNIENV
 }
 
+void ReservationListenerWrapper::tryFree(int64_t size)
+{
+    GET_JNIENV(env)
+    tryCallVoidMethod(env, listener, reservation_listener_unreserve, size);
+    CLEAN_JNIENV
+}
+
 size_t ReservationListenerWrapper::currentMemory()
 {
     GET_JNIENV(env)
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h 
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
index 1dfb3671f..a4d26cb54 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
@@ -35,6 +35,8 @@ public:
     void reserve(int64_t size);
     void reserveOrThrow(int64_t size);
     void free(int64_t size);
+    /// Make sure destructors in CH Backend do not throw exceptions
+    void tryFree(int64_t size);
     size_t currentMemory();
 
 
diff --git a/cpp-ch/local-engine/jni/jni_common.h 
b/cpp-ch/local-engine/jni/jni_common.h
index c1cc805aa..8d1437083 100644
--- a/cpp-ch/local-engine/jni/jni_common.h
+++ b/cpp-ch/local-engine/jni/jni_common.h
@@ -28,6 +28,7 @@ namespace DB
 namespace ErrorCodes
 {
     extern const int LOGICAL_ERROR;
+    extern const int UNKNOWN_TYPE;
 }
 }
 
@@ -62,6 +63,24 @@ jbyteArray stringTojbyteArray(JNIEnv * env, const 
std::string & str);
         throw DB::Exception::createRuntime(DB::ErrorCodes::LOGICAL_ERROR, 
msg); \
     }
 
+#define TRY_LOCAL_ENGINE_JNI_JMETHOD_START
+#define TRY_LOCAL_ENGINE_JNI_JMETHOD_END(env) \
+    if ((env)->ExceptionCheck()) \
+    { \
+        LOG_ERROR(&Poco::Logger::get("local_engine"), "Enter java exception 
handle."); \
+        auto excp = (env)->ExceptionOccurred(); \
+        (env)->ExceptionDescribe(); \
+        (env)->ExceptionClear(); \
+        jclass cls = (env)->GetObjectClass(excp); \
+        jmethodID mid = env->GetMethodID(cls, "toString", 
"()Ljava/lang/String;"); \
+        jstring jmsg = static_cast<jstring>((env)->CallObjectMethod(excp, 
mid)); \
+        const char * nmsg = (env)->GetStringUTFChars(jmsg, NULL); \
+        std::string msg = std::string(nmsg); \
+        env->ReleaseStringUTFChars(jmsg, nmsg); \
+        LOG_WARNING(&Poco::Logger::get("local_engine"), "Ignore java 
exception: {}", msg); \
+    }
+
+
 template <typename... Args>
 jobject safeCallObjectMethod(JNIEnv * env, jobject obj, jmethodID method_id, 
Args... args)
 {
@@ -106,6 +125,14 @@ void safeCallVoidMethod(JNIEnv * env, jobject obj, 
jmethodID method_id, Args...
     LOCAL_ENGINE_JNI_JMETHOD_END(env)
 }
 
+template <typename... Args>
+void tryCallVoidMethod(JNIEnv * env, jobject obj, jmethodID method_id, Args... 
args)
+{
+    TRY_LOCAL_ENGINE_JNI_JMETHOD_START
+    env->CallVoidMethod(obj, method_id, args...);
+    TRY_LOCAL_ENGINE_JNI_JMETHOD_END(env)
+}
+
 template <typename... Args>
 jlong safeCallStaticLongMethod(JNIEnv * env, jclass clazz, jmethodID 
method_id, Args... args)
 {
diff --git a/cpp-ch/local-engine/jni/jni_error.h 
b/cpp-ch/local-engine/jni/jni_error.h
index 216a5da93..c6f46bc8f 100644
--- a/cpp-ch/local-engine/jni/jni_error.h
+++ b/cpp-ch/local-engine/jni/jni_error.h
@@ -37,6 +37,8 @@ public:
     ~JniErrorsGlobalState() = default;
 
     static JniErrorsGlobalState & instance();
+    static void throwException(JNIEnv * env, jclass exception_class, const 
std::string & message, const std::string & stack_trace = "");
+
     void initialize(JNIEnv * env_);
     void destroy(JNIEnv * env);
 
@@ -48,7 +50,6 @@ public:
 
     void throwException(JNIEnv * env, const DB::Exception & e);
     void throwException(JNIEnv * env, const std::exception & e);
-    static void throwException(JNIEnv * env, jclass exception_class, const 
std::string & message, const std::string & stack_trace = "");
     void throwRuntimeException(JNIEnv * env, const std::string & message, 
const std::string & stack_trace = "");
 
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 1c088720d..be28b9fab 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -22,7 +22,6 @@
 #include <Compression/CompressedReadBuffer.h>
 #include <DataTypes/DataTypeNullable.h>
 #include <Join/BroadCastJoinBuilder.h>
-#include <Operator/BlockCoalesceOperator.h>
 #include <Parser/CHColumnToSparkRow.h>
 #include <Parser/MergeTreeRelParser.h>
 #include <Parser/RelParser.h>
@@ -285,6 +284,7 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     plan_string.assign(reinterpret_cast<const char *>(plan_address), 
plan_size);
     auto query_plan = parser.parse(plan_string);
     local_engine::LocalExecutor * executor = new 
local_engine::LocalExecutor(query_context);
+    LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
     executor->setMetric(parser.getMetric());
     executor->setExtraPlanHolder(parser.extra_plan_holder);
     executor->execute(std::move(query_plan));
@@ -294,44 +294,6 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
-JNIEXPORT jboolean Java_org_apache_gluten_row_RowIterator_nativeHasNext(JNIEnv 
* env, jobject /*obj*/, jlong executor_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
-    return executor->hasNext();
-    LOCAL_ENGINE_JNI_METHOD_END(env, false)
-}
-
-JNIEXPORT jobject Java_org_apache_gluten_row_RowIterator_nativeNext(JNIEnv * 
env, jobject /*obj*/, jlong executor_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
-    local_engine::SparkRowInfoPtr spark_row_info = executor->next();
-
-    auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
-    const auto * offsets_src = reinterpret_cast<const jlong 
*>(spark_row_info->getOffsets().data());
-    env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(), 
offsets_src);
-    auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows());
-    const auto * lengths_src = reinterpret_cast<const jlong 
*>(spark_row_info->getLengths().data());
-    env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(), 
lengths_src);
-    int64_t address = 
reinterpret_cast<int64_t>(spark_row_info->getBufferAddress());
-    int64_t column_number = 
reinterpret_cast<int64_t>(spark_row_info->getNumCols());
-    int64_t total_size = 
reinterpret_cast<int64_t>(spark_row_info->getTotalBytes());
-
-    jobject spark_row_info_object
-        = env->NewObject(spark_row_info_class, spark_row_info_constructor, 
offsets_arr, lengths_arr, address, column_number, total_size);
-    return spark_row_info_object;
-    LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
-}
-
-JNIEXPORT void Java_org_apache_gluten_row_RowIterator_nativeClose(JNIEnv * 
env, jobject /*obj*/, jlong executor_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
-    delete executor;
-    LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
 // Columnar Iterator
 JNIEXPORT jboolean 
Java_org_apache_gluten_vectorized_BatchIterator_nativeHasNext(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
@@ -346,15 +308,24 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
     LOCAL_ENGINE_JNI_METHOD_START
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
     DB::Block * column_batch = executor->nextColumnar();
-    // LOG_DEBUG(&Poco::Logger::get("jni"), "row size of the column batch: 
{}", column_batch->rows());
     return reinterpret_cast<Int64>(column_batch);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
+JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+    executor->cancel();
+    LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
+    LOCAL_ENGINE_JNI_METHOD_END(env, )
+}
+
 JNIEXPORT void 
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env, 
jobject /*obj*/, jlong executor_address)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     local_engine::LocalExecutor * executor = 
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+    LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}", 
reinterpret_cast<uintptr_t>(executor));
     delete executor;
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
@@ -372,21 +343,6 @@ JNIEXPORT jobject 
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMet
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
 }
 
-JNIEXPORT void
-Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetJavaTmpDir(JNIEnv
 * /*env*/, jobject /*obj*/, jstring /*dir*/)
-{
-}
-
-JNIEXPORT void
-Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetBatchSize(JNIEnv
 * /*env*/, jobject /*obj*/, jint /*batch_size*/)
-{
-}
-
-JNIEXPORT void 
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMetricsTime(
-    JNIEnv * /*env*/, jobject /*obj*/, jboolean /*setMetricsTime*/)
-{
-}
-
 JNIEXPORT jboolean
 Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env, 
jobject obj, jlong block_address, jint column_position)
 {
@@ -603,52 +559,6 @@ JNIEXPORT void 
Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIE
     LOCAL_ENGINE_JNI_METHOD_END(env, )
 }
 
-JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv
 * env, jobject /*obj*/, jint buf_size)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::BlockCoalesceOperator * instance = new 
local_engine::BlockCoalesceOperator(buf_size);
-    return reinterpret_cast<jlong>(instance);
-    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
-}
-
-JNIEXPORT void 
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeMergeBlock(
-    JNIEnv * env, jobject /*obj*/, jlong instance_address, jlong block_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::BlockCoalesceOperator * instance = 
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
-    DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
-    auto new_block = DB::Block(*block);
-    instance->mergeBlock(new_block);
-    LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT jboolean 
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env, 
jobject /*obj*/, jlong instance_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::BlockCoalesceOperator * instance = 
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
-    bool full = instance->isFull();
-    return full ? JNI_TRUE : JNI_FALSE;
-    LOCAL_ENGINE_JNI_METHOD_END(env, false)
-}
-
-JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv * 
env, jobject /*obj*/, jlong instance_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::BlockCoalesceOperator * instance = 
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
-    auto * block = instance->releaseBlock();
-    Int64 address = reinterpret_cast<jlong>(block);
-    return address;
-    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
-}
-
-JNIEXPORT void 
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env, 
jobject /*obj*/, jlong instance_address)
-{
-    LOCAL_ENGINE_JNI_METHOD_START
-    local_engine::BlockCoalesceOperator * instance = 
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
-    delete instance;
-    LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
 // Splitter Jni Wrapper
 JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
     JNIEnv * env,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to