This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 944d85900 [GLUTEN-5787][CH]Make pipeline and shuffle exit gracefully
when tasks in executors are killed or interrupted (#5839)
944d85900 is described below
commit 944d859004a83617a463b7e1a17bac378d5a1d08
Author: 李扬 <[email protected]>
AuthorDate: Thu Jun 6 11:18:47 2024 +0800
[GLUTEN-5787][CH]Make pipeline and shuffle exit gracefully when tasks in
executors are killed or interrupted (#5839)
What changes were proposed in this pull request?
Changes:
Clean code: remove useless JNIs and classes under cpp-ch
Support cancel for all gluten processors. It was triggered when task is
killed or shut down.
Make sure offheap memory free does not throw exception. Ref:
https://zhuanlan.zhihu.com/p/65454580
(Fixes: #5787 #5823)
How was this patch tested?
Manual
---
.../apache/gluten/vectorized/BatchIterator.java | 11 ++
.../vectorized/CHNativeExpressionEvaluator.java | 10 +-
.../backendsapi/clickhouse/CHIteratorApi.scala | 21 +++-
.../execution/NativeFileScanColumnarRDD.scala | 12 ++-
.../metrics/GlutenClickHouseMetricsUTUtils.scala | 4 +-
cpp-ch/local-engine/Common/BlockIterator.cpp | 6 ++
cpp-ch/local-engine/Common/QueryContext.cpp | 2 +-
.../Operator/BlockCoalesceOperator.cpp | 53 ----------
.../local-engine/Operator/BlockCoalesceOperator.h | 47 ---------
.../Operator/BlocksBufferPoolTransform.cpp | 2 +-
cpp-ch/local-engine/Operator/EmptyProjectStep.cpp | 2 +-
cpp-ch/local-engine/Operator/ExpandTransform.cpp | 12 ++-
.../Operator/GraceMergingAggregatedStep.cpp | 6 +-
.../Operator/PartitionColumnFillingTransform.h | 4 +-
.../Operator/StreamingAggregatingStep.cpp | 7 +-
.../local-engine/Parser/SerializedPlanParser.cpp | 7 ++
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 15 +--
.../local-engine/Storages/SourceFromJavaIter.cpp | 15 +--
.../SubstraitSource/SubstraitFileSource.cpp | 23 ++++-
.../Storages/SubstraitSource/SubstraitFileSource.h | 26 ++++-
.../jni/ReservationListenerWrapper.cpp | 7 ++
.../local-engine/jni/ReservationListenerWrapper.h | 2 +
cpp-ch/local-engine/jni/jni_common.h | 27 +++++
cpp-ch/local-engine/jni/jni_error.h | 3 +-
cpp-ch/local-engine/local_engine_jni.cpp | 112 ++-------------------
25 files changed, 190 insertions(+), 246 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index 5698caf02..d674c6e90 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -23,9 +23,11 @@ import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
public class BatchIterator extends GeneralOutIterator {
private final long handle;
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
public BatchIterator(long handle) {
super();
@@ -46,6 +48,8 @@ public class BatchIterator extends GeneralOutIterator {
private native void nativeClose(long nativeHandle);
+ private native void nativeCancel(long nativeHandle);
+
private native IMetrics nativeFetchMetrics(long nativeHandle);
@Override
@@ -76,4 +80,11 @@ public class BatchIterator extends GeneralOutIterator {
public void closeInternal() {
nativeClose(handle);
}
+
+ // Used to cancel native pipeline execution when spark task is killed
+ public final void cancel() {
+ if (cancelled.compareAndSet(false, true)) {
+ nativeCancel(handle);
+ }
+ }
}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
index 0d307d231..b8b4138dc 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
@@ -83,7 +83,7 @@ public class CHNativeExpressionEvaluator {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
- public GeneralOutIterator createKernelWithBatchIterator(
+ public BatchIterator createKernelWithBatchIterator(
byte[] wsPlan,
byte[][] splitInfo,
List<GeneralInIterator> iterList,
@@ -97,11 +97,11 @@ public class CHNativeExpressionEvaluator {
iterList.toArray(new GeneralInIterator[0]),
buildNativeConf(getNativeBackendConf()),
materializeInput);
- return createOutIterator(handle);
+ return createBatchIterator(handle);
}
// Only for UT.
- public GeneralOutIterator createKernelWithBatchIterator(
+ public BatchIterator createKernelWithBatchIterator(
long allocId, byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator>
iterList) {
long handle =
jniWrapper.nativeCreateKernelWithIterator(
@@ -111,10 +111,10 @@ public class CHNativeExpressionEvaluator {
iterList.toArray(new GeneralInIterator[0]),
buildNativeConf(getNativeBackendConf()),
false);
- return createOutIterator(handle);
+ return createBatchIterator(handle);
}
- private GeneralOutIterator createOutIterator(long nativeHandle) {
+ private BatchIterator createBatchIterator(long nativeHandle) {
return new BatchIterator(nativeHandle);
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index bc16c2d77..63f7eeb79 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator}
import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
@@ -206,13 +206,19 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
val splitInfoByteArray = inputPartition
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
- val resIter: GeneralOutIterator =
+ val resIter =
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
inBatchIters,
false)
+ context.addTaskFailureListener(
+ (ctx, _) => {
+ if (ctx.isInterrupted()) {
+ resIter.cancel()
+ }
+ })
context.addTaskCompletionListener[Unit](_ => resIter.close())
val iter = new Iterator[Any] {
private val inputMetrics = context.taskMetrics().inputMetrics
@@ -304,6 +310,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
}
}
var closed = false
+ val cancelled = false
def close(): Unit = {
closed = true
@@ -311,6 +318,16 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
// relationHolder.clear()
}
+ def cancel(): Unit = {
+ nativeIterator.cancel()
+ }
+
+ context.addTaskFailureListener(
+ (ctx, _) => {
+ if (ctx.isInterrupted()) {
+ cancel()
+ }
+ })
context.addTaskCompletionListener[Unit](_ => close())
new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
index 624a4390d..af512934b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/NativeFileScanColumnarRDD.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
CloseableCHColumnBatchIterator, GeneralInIterator}
import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.rdd.RDD
@@ -47,7 +47,7 @@ class NativeFileScanColumnarRDD(
.asInstanceOf[GlutenPartition]
.splitInfosByteArray
- val resIter: GeneralOutIterator = GlutenTimeMetric.millis(scanTime) {
+ val resIter = GlutenTimeMetric.millis(scanTime) {
_ =>
val transKernel = new CHNativeExpressionEvaluator()
val inBatchIters = new util.ArrayList[GeneralInIterator]()
@@ -58,6 +58,14 @@ class NativeFileScanColumnarRDD(
false
)
}
+ TaskContext
+ .get()
+ .addTaskFailureListener(
+ (ctx, _) => {
+ if (ctx.isInterrupted()) {
+ resIter.cancel()
+ }
+ })
TaskContext.get().addTaskCompletionListener[Unit](_ => resIter.close())
val iter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
var scanTotalTime = 0L
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
index bc395ca88..ee0ad8039 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseMetricsUTUtils.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
import org.apache.gluten.metrics.{MetricsUtil, NativeMetrics}
import org.apache.gluten.utils.SubstraitPlanPrinterUtil
-import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
GeneralInIterator, GeneralOutIterator}
+import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator,
GeneralInIterator}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -47,7 +47,7 @@ object GlutenClickHouseMetricsUTUtils {
val transKernel = new CHNativeExpressionEvaluator()
val mockMemoryAllocator = CHNativeMemoryAllocators.contextInstanceForUT()
- val resIter: GeneralOutIterator =
transKernel.createKernelWithBatchIterator(
+ val resIter = transKernel.createKernelWithBatchIterator(
mockMemoryAllocator.getNativeInstanceId,
substraitPlan.toByteArray,
new Array[Array[Byte]](0),
diff --git a/cpp-ch/local-engine/Common/BlockIterator.cpp
b/cpp-ch/local-engine/Common/BlockIterator.cpp
index 1a76f646b..464701893 100644
--- a/cpp-ch/local-engine/Common/BlockIterator.cpp
+++ b/cpp-ch/local-engine/Common/BlockIterator.cpp
@@ -34,24 +34,30 @@ void local_engine::BlockIterator::checkNextValid()
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Block iterator
next should after hasNext");
}
}
+
void BlockIterator::produce()
{
consumed = false;
}
+
void BlockIterator::consume()
{
consumed = true;
}
+
bool BlockIterator::isConsumed() const
{
return consumed;
}
+
DB::Block & BlockIterator::currentBlock()
{
return cached_block;
}
+
void BlockIterator::setCurrentBlock(DB::Block & block)
{
cached_block = block;
}
+
}
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp
b/cpp-ch/local-engine/Common/QueryContext.cpp
index c659e6f34..f4d39c612 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -67,7 +67,7 @@ int64_t initializeQuery(ReservationListenerWrapperPtr
listener)
else
listener->reserve(size);
};
- CurrentMemoryTracker::before_free = [listener](Int64 size) -> void {
listener->free(size); };
+ CurrentMemoryTracker::before_free = [listener](Int64 size) -> void {
listener->tryFree(size); };
CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return
listener->currentMemory(); };
allocator_map.insert(allocator_id, allocator_context);
return allocator_id;
diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
deleted file mode 100644
index 756249e8a..000000000
--- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "BlockCoalesceOperator.h"
-#include <Core/Block.h>
-
-namespace local_engine
-{
-
-void BlockCoalesceOperator::mergeBlock(DB::Block & block)
-{
- block_buffer.add(block, 0, static_cast<int>(block.rows()));
-}
-
-bool BlockCoalesceOperator::isFull()
-{
- return block_buffer.size() >= buf_size;
-}
-
-DB::Block * BlockCoalesceOperator::releaseBlock()
-{
- clearCache();
- cached_block = new DB::Block(block_buffer.releaseColumns());
- return cached_block;
-}
-
-BlockCoalesceOperator::~BlockCoalesceOperator()
-{
- clearCache();
-}
-
-void BlockCoalesceOperator::clearCache()
-{
- if (cached_block)
- {
- delete cached_block;
- cached_block = nullptr;
- }
-}
-}
diff --git a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
b/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
deleted file mode 100644
index 2b67b40ce..000000000
--- a/cpp-ch/local-engine/Operator/BlockCoalesceOperator.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <Shuffle/ShuffleSplitter.h>
-
-namespace DB
-{
-class Block;
-}
-
-namespace local_engine
-{
-
-class BlockCoalesceOperator
-{
-public:
- explicit BlockCoalesceOperator(size_t buf_size_) : buf_size(buf_size_) { }
- ~BlockCoalesceOperator();
-
- void mergeBlock(DB::Block & block);
- bool isFull();
- DB::Block * releaseBlock();
-
-private:
- void clearCache();
-
- size_t buf_size;
- ColumnsBuffer block_buffer;
- DB::Block * cached_block = nullptr;
-
-};
-}
diff --git a/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
b/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
index 3427a81c6..16a5bd5d2 100644
--- a/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
+++ b/cpp-ch/local-engine/Operator/BlocksBufferPoolTransform.cpp
@@ -43,7 +43,7 @@ DB::IProcessor::Status BlocksBufferPoolTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.front();
- if (output.isFinished())
+ if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
diff --git a/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
b/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
index 58cb33e59..62991585f 100644
--- a/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
+++ b/cpp-ch/local-engine/Operator/EmptyProjectStep.cpp
@@ -39,7 +39,7 @@ public:
{
auto & output = outputs.front();
auto & input = inputs.front();
- if (output.isFinished())
+ if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
diff --git a/cpp-ch/local-engine/Operator/ExpandTransform.cpp
b/cpp-ch/local-engine/Operator/ExpandTransform.cpp
index d48d48439..106c38e2d 100644
--- a/cpp-ch/local-engine/Operator/ExpandTransform.cpp
+++ b/cpp-ch/local-engine/Operator/ExpandTransform.cpp
@@ -48,7 +48,7 @@ ExpandTransform::Status ExpandTransform::prepare()
auto & output = outputs.front();
auto & input = inputs.front();
- if (output.isFinished())
+ if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
@@ -79,12 +79,12 @@ ExpandTransform::Status ExpandTransform::prepare()
if (!input.hasData())
return Status::NeedData;
-
+
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
}
-
+
return Status::Ready;
}
@@ -92,6 +92,7 @@ void ExpandTransform::work()
{
if (expand_expr_iterator >= project_set_exprs.getExpandRows())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"expand_expr_iterator >= project_set_exprs.getExpandRows()");
+
const auto & original_cols = input_chunk.getColumns();
size_t rows = input_chunk.getNumRows();
DB::Columns cols;
@@ -139,8 +140,9 @@ void ExpandTransform::work()
}
}
output_chunk = DB::Chunk(cols, rows);
- expand_expr_iterator += 1;
- has_output = expand_expr_iterator <= project_set_exprs.getExpandRows();
+ has_output = true;
+
+ ++expand_expr_iterator;
has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
}
}
diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
index ca86785fe..a9a2df276 100644
--- a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
+++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
@@ -146,7 +146,7 @@ GraceMergingAggregatedTransform::Status
GraceMergingAggregatedTransform::prepare
{
auto & output = outputs.front();
auto & input = inputs.front();
- if (output.isFinished())
+ if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
@@ -224,7 +224,7 @@ void GraceMergingAggregatedTransform::work()
block_converter =
prepareBucketOutputBlocks(current_bucket_index);
if (block_converter)
break;
- current_bucket_index++;
+ current_bucket_index++;
}
}
if (!block_converter)
@@ -455,7 +455,7 @@ std::unique_ptr<AggregateDataBlockConverter>
GraceMergingAggregatedTransform::pr
block = {};
}
}
-
+
if (buffer_file_stream.original_file_stream)
{
buffer_file_stream.original_file_stream->finishWriting();
diff --git a/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
b/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
index 2eac10f39..692991b3f 100644
--- a/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
+++ b/cpp-ch/local-engine/Operator/PartitionColumnFillingTransform.h
@@ -25,9 +25,11 @@ class PartitionColumnFillingTransform : public
DB::ISimpleTransform
public:
PartitionColumnFillingTransform(
const DB::Block & input_, const DB::Block & output_, const String &
partition_col_name_, const String & partition_col_value_);
- void transform(DB::Chunk & chunk) override;
+
String getName() const override { return
"PartitionColumnFillingTransform"; }
+ void transform(DB::Chunk & chunk) override;
+
private:
DB::ColumnPtr createPartitionColumn();
diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
index 698d353b1..65d77f8e9 100644
--- a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
+++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp
@@ -67,11 +67,12 @@ StreamingAggregatingTransform::Status
StreamingAggregatingTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.front();
- if (output.isFinished())
+ if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
}
+
if (has_output)
{
if (output.canPush())
@@ -140,10 +141,10 @@ bool StreamingAggregatingTransform::needEvict()
auto max_mem_used =
static_cast<size_t>(context->getSettingsRef().max_memory_usage *
max_allowed_memory_usage_ratio);
auto current_result_rows = data_variants->size();
- /// avoid evict empty or too small aggregated results.
+ /// avoid evict empty or too small aggregated results.
if (current_result_rows < aggregated_keys_before_evict)
return false;
-
+
/// If the grouping keys is high cardinality, we should evict data
variants early, and avoid to use a big
/// hash table.
if (static_cast<double>(total_output_rows)/total_input_rows >
high_cardinality_threshold)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index c2bcd1a36..1c907035a 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -2043,6 +2043,7 @@ LocalExecutor::~LocalExecutor()
{
if (context->getConfigRef().getBool("dump_pipeline", false))
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}",
dumpPipeline());
+
if (spark_buffer)
{
ch_column_to_spark_row->freeMem(spark_buffer->address,
spark_buffer->size);
@@ -2166,6 +2167,12 @@ Block * LocalExecutor::nextColumnar()
return columnar_batch;
}
+void LocalExecutor::cancel()
+{
+ if (executor)
+ executor->cancel();
+}
+
Block & LocalExecutor::getHeader()
{
return header;
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 73448b069..d0a16ec71 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -402,21 +402,26 @@ class LocalExecutor : public BlockIterator
public:
LocalExecutor() = default;
explicit LocalExecutor(ContextPtr context);
+ ~LocalExecutor();
+
void execute(QueryPlanPtr query_plan);
SparkRowInfoPtr next();
Block * nextColumnar();
bool hasNext();
- ~LocalExecutor();
- Block & getHeader();
+ /// Stop execution, used when task receives shutdown command or executor
receives SIGTERM signal
+ void cancel();
+ Block & getHeader();
RelMetricPtr getMetric() const { return metric; }
void setMetric(RelMetricPtr metric_) { metric = metric_; }
-
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) {
extra_plan_holder = std::move(extra_plan_holder_); }
-
private:
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
+
+ /// Dump processor runtime information to log
+ std::string dumpPipeline();
+
QueryPipeline query_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
Block header;
@@ -427,8 +432,6 @@ private:
RelMetricPtr metric;
std::vector<QueryPlanPtr> extra_plan_holder;
- /// Dump processor runtime information to log
- std::string dumpPipeline();
};
diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
index 54d1d253e..37501e985 100644
--- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
+++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
@@ -53,12 +53,11 @@ static DB::Block getRealHeader(const DB::Block & header)
DB::Block * SourceFromJavaIter::peekBlock(JNIEnv * env, jobject java_iter)
{
jboolean has_next = safeCallBooleanMethod(env, java_iter,
serialized_record_batch_iterator_hasNext);
- if (has_next)
- {
- jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env,
java_iter, serialized_record_batch_iterator_next));
- return reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
- }
- return nullptr;
+ if (!has_next)
+ return nullptr;
+
+ jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env,
java_iter, serialized_record_batch_iterator_next));
+ return reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
}
@@ -75,6 +74,9 @@ SourceFromJavaIter::SourceFromJavaIter(
DB::Chunk SourceFromJavaIter::generate()
{
+ if (isCancelled())
+ return {};
+
GET_JNIENV(env)
SCOPE_EXIT({CLEAN_JNIENV});
@@ -152,6 +154,7 @@ void SourceFromJavaIter::convertNullable(DB::Chunk & chunk)
chunk.setColumns(columns, rows);
}
+
DB::ColumnPtr SourceFromJavaIter::convertNestedNullable(const DB::ColumnPtr &
column, const DB::DataTypePtr & target_type)
{
DB::WhichDataType column_type(column->getDataType());
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
index 80dccf759..5b872244e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp
@@ -104,6 +104,9 @@ DB::Chunk SubstraitFileSource::generate()
bool SubstraitFileSource::tryPrepareReader()
{
+ if (isCancelled())
+ return false;
+
if (file_reader)
return true;
@@ -140,6 +143,13 @@ bool SubstraitFileSource::tryPrepareReader()
return true;
}
+
+void SubstraitFileSource::onCancel()
+{
+ if (file_reader)
+ file_reader->cancel();
+}
+
DB::ColumnPtr FileReaderWrapper::createConstColumn(DB::DataTypePtr data_type,
const DB::Field & field, size_t rows)
{
auto nested_type = DB::removeNullable(data_type);
@@ -280,9 +290,13 @@
ConstColumnsFileReader::ConstColumnsFileReader(FormatFilePtr file_, DB::ContextP
remained_rows = *rows;
}
+
bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
{
- if (!remained_rows) [[unlikely]]
+ if (isCancelled())
+ return false;
+
+ if (!remained_rows)
return false;
size_t to_read_rows = 0;
@@ -296,6 +310,7 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
to_read_rows = block_size;
remained_rows -= block_size;
}
+
DB::Columns res_columns;
if (const size_t col_num = header.columns())
{
@@ -307,8 +322,9 @@ bool ConstColumnsFileReader::pull(DB::Chunk & chunk)
auto type = col_with_name_and_type.type;
const auto & name = col_with_name_and_type.name;
auto it = partition_values.find(name);
- if (it == partition_values.end()) [[unlikely]]
+ if (it == partition_values.end())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow
partition column : {}", name);
+
res_columns.emplace_back(createColumn(it->second, type,
to_read_rows));
}
}
@@ -331,6 +347,9 @@ NormalFileReader::NormalFileReader(
bool NormalFileReader::pull(DB::Chunk & chunk)
{
+ if (isCancelled())
+ return false;
+
DB::Chunk raw_chunk = input_format->input->generate();
const size_t rows = raw_chunk.getNumRows();
if (!rows)
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
index 973f3cd35..650ec5d96 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h
@@ -34,6 +34,15 @@ public:
virtual ~FileReaderWrapper() = default;
virtual bool pull(DB::Chunk & chunk) = 0;
+ void cancel()
+ {
+ bool already_cancelled = is_cancelled.exchange(true,
std::memory_order_acq_rel);
+ if (!already_cancelled)
+ onCancel();
+ }
+
+ bool isCancelled() const { return
is_cancelled.load(std::memory_order_acquire); }
+
/// Apply key condition to the reader, if use_local_format is true,
column_index_filter will be used
/// otherwise it will be ignored
virtual void applyKeyCondition(
@@ -42,7 +51,11 @@ public:
}
protected:
+ virtual void onCancel() {};
+
FormatFilePtr file;
+ std::atomic<bool> is_cancelled{false};
+
static DB::ColumnPtr createConstColumn(DB::DataTypePtr type, const
DB::Field & field, size_t rows);
static DB::ColumnPtr createColumn(const String & value, DB::DataTypePtr
type, size_t rows);
@@ -68,10 +81,14 @@ public:
}
private:
+ void onCancel() override
+ {
+ input_format->input->cancel();
+ }
+
DB::ContextPtr context;
DB::Block to_read_header;
DB::Block output_header;
-
FormatFile::InputFormatPtr input_format;
};
@@ -89,6 +106,7 @@ public:
ConstColumnsFileReader(
FormatFilePtr file_, DB::ContextPtr context_, const DB::Block &
header_, size_t block_size_ = DB::DEFAULT_BLOCK_SIZE);
~ConstColumnsFileReader() override = default;
+
bool pull(DB::Chunk & chunk) override;
private:
@@ -112,6 +130,9 @@ protected:
DB::Chunk generate() override;
private:
+ bool tryPrepareReader();
+ void onCancel() override;
+
DB::ContextPtr context;
DB::Block output_header; /// Sample header may contains partitions keys
DB::Block to_read_header; // Sample header not include partition keys
@@ -120,9 +141,6 @@ private:
UInt32 current_file_index = 0;
std::unique_ptr<FileReaderWrapper> file_reader;
ReadBufferBuilderPtr read_buffer_builder;
-
ColumnIndexFilterPtr column_index_filter;
-
- bool tryPrepareReader();
};
}
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
index 65b29c2a2..dad0ecf66 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
@@ -58,6 +58,13 @@ void ReservationListenerWrapper::free(int64_t size)
CLEAN_JNIENV
}
+void ReservationListenerWrapper::tryFree(int64_t size)
+{
+ GET_JNIENV(env)
+ tryCallVoidMethod(env, listener, reservation_listener_unreserve, size);
+ CLEAN_JNIENV
+}
+
size_t ReservationListenerWrapper::currentMemory()
{
GET_JNIENV(env)
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
index 1dfb3671f..a4d26cb54 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
@@ -35,6 +35,8 @@ public:
void reserve(int64_t size);
void reserveOrThrow(int64_t size);
void free(int64_t size);
+ /// Make sure destructors in CH Backend do not throw exceptions
+ void tryFree(int64_t size);
size_t currentMemory();
diff --git a/cpp-ch/local-engine/jni/jni_common.h
b/cpp-ch/local-engine/jni/jni_common.h
index c1cc805aa..8d1437083 100644
--- a/cpp-ch/local-engine/jni/jni_common.h
+++ b/cpp-ch/local-engine/jni/jni_common.h
@@ -28,6 +28,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
+ extern const int UNKNOWN_TYPE;
}
}
@@ -62,6 +63,24 @@ jbyteArray stringTojbyteArray(JNIEnv * env, const
std::string & str);
throw DB::Exception::createRuntime(DB::ErrorCodes::LOGICAL_ERROR,
msg); \
}
+#define TRY_LOCAL_ENGINE_JNI_JMETHOD_START
+#define TRY_LOCAL_ENGINE_JNI_JMETHOD_END(env) \
+ if ((env)->ExceptionCheck()) \
+ { \
+ LOG_ERROR(&Poco::Logger::get("local_engine"), "Enter java exception
handle."); \
+ auto excp = (env)->ExceptionOccurred(); \
+ (env)->ExceptionDescribe(); \
+ (env)->ExceptionClear(); \
+ jclass cls = (env)->GetObjectClass(excp); \
+ jmethodID mid = env->GetMethodID(cls, "toString",
"()Ljava/lang/String;"); \
+ jstring jmsg = static_cast<jstring>((env)->CallObjectMethod(excp,
mid)); \
+ const char * nmsg = (env)->GetStringUTFChars(jmsg, NULL); \
+ std::string msg = std::string(nmsg); \
+ env->ReleaseStringUTFChars(jmsg, nmsg); \
+ LOG_WARNING(&Poco::Logger::get("local_engine"), "Ignore java
exception: {}", msg); \
+ }
+
+
template <typename... Args>
jobject safeCallObjectMethod(JNIEnv * env, jobject obj, jmethodID method_id,
Args... args)
{
@@ -106,6 +125,14 @@ void safeCallVoidMethod(JNIEnv * env, jobject obj,
jmethodID method_id, Args...
LOCAL_ENGINE_JNI_JMETHOD_END(env)
}
+template <typename... Args>
+void tryCallVoidMethod(JNIEnv * env, jobject obj, jmethodID method_id, Args...
args)
+{
+ TRY_LOCAL_ENGINE_JNI_JMETHOD_START
+ env->CallVoidMethod(obj, method_id, args...);
+ TRY_LOCAL_ENGINE_JNI_JMETHOD_END(env)
+}
+
template <typename... Args>
jlong safeCallStaticLongMethod(JNIEnv * env, jclass clazz, jmethodID
method_id, Args... args)
{
diff --git a/cpp-ch/local-engine/jni/jni_error.h
b/cpp-ch/local-engine/jni/jni_error.h
index 216a5da93..c6f46bc8f 100644
--- a/cpp-ch/local-engine/jni/jni_error.h
+++ b/cpp-ch/local-engine/jni/jni_error.h
@@ -37,6 +37,8 @@ public:
~JniErrorsGlobalState() = default;
static JniErrorsGlobalState & instance();
+ static void throwException(JNIEnv * env, jclass exception_class, const
std::string & message, const std::string & stack_trace = "");
+
void initialize(JNIEnv * env_);
void destroy(JNIEnv * env);
@@ -48,7 +50,6 @@ public:
void throwException(JNIEnv * env, const DB::Exception & e);
void throwException(JNIEnv * env, const std::exception & e);
- static void throwException(JNIEnv * env, jclass exception_class, const
std::string & message, const std::string & stack_trace = "");
void throwRuntimeException(JNIEnv * env, const std::string & message,
const std::string & stack_trace = "");
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 1c088720d..be28b9fab 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -22,7 +22,6 @@
#include <Compression/CompressedReadBuffer.h>
#include <DataTypes/DataTypeNullable.h>
#include <Join/BroadCastJoinBuilder.h>
-#include <Operator/BlockCoalesceOperator.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/MergeTreeRelParser.h>
#include <Parser/RelParser.h>
@@ -285,6 +284,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
plan_string.assign(reinterpret_cast<const char *>(plan_address),
plan_size);
auto query_plan = parser.parse(plan_string);
local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(query_context);
+ LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
@@ -294,44 +294,6 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
-JNIEXPORT jboolean Java_org_apache_gluten_row_RowIterator_nativeHasNext(JNIEnv
* env, jobject /*obj*/, jlong executor_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
- return executor->hasNext();
- LOCAL_ENGINE_JNI_METHOD_END(env, false)
-}
-
-JNIEXPORT jobject Java_org_apache_gluten_row_RowIterator_nativeNext(JNIEnv *
env, jobject /*obj*/, jlong executor_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
- local_engine::SparkRowInfoPtr spark_row_info = executor->next();
-
- auto * offsets_arr = env->NewLongArray(spark_row_info->getNumRows());
- const auto * offsets_src = reinterpret_cast<const jlong
*>(spark_row_info->getOffsets().data());
- env->SetLongArrayRegion(offsets_arr, 0, spark_row_info->getNumRows(),
offsets_src);
- auto * lengths_arr = env->NewLongArray(spark_row_info->getNumRows());
- const auto * lengths_src = reinterpret_cast<const jlong
*>(spark_row_info->getLengths().data());
- env->SetLongArrayRegion(lengths_arr, 0, spark_row_info->getNumRows(),
lengths_src);
- int64_t address =
reinterpret_cast<int64_t>(spark_row_info->getBufferAddress());
- int64_t column_number =
reinterpret_cast<int64_t>(spark_row_info->getNumCols());
- int64_t total_size =
reinterpret_cast<int64_t>(spark_row_info->getTotalBytes());
-
- jobject spark_row_info_object
- = env->NewObject(spark_row_info_class, spark_row_info_constructor,
offsets_arr, lengths_arr, address, column_number, total_size);
- return spark_row_info_object;
- LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
-}
-
-JNIEXPORT void Java_org_apache_gluten_row_RowIterator_nativeClose(JNIEnv *
env, jobject /*obj*/, jlong executor_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
- delete executor;
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
// Columnar Iterator
JNIEXPORT jboolean
Java_org_apache_gluten_vectorized_BatchIterator_nativeHasNext(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
@@ -346,15 +308,24 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_BatchIterator_nativeCHNext(JNI
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
DB::Block * column_batch = executor->nextColumnar();
- // LOG_DEBUG(&Poco::Logger::get("jni"), "row size of the column batch:
{}", column_batch->rows());
return reinterpret_cast<Int64>(column_batch);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
+JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeCancel(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+ executor->cancel();
+ LOG_INFO(&Poco::Logger::get("jni"), "Cancel LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
+ LOCAL_ENGINE_JNI_METHOD_END(env, )
+}
+
JNIEXPORT void
Java_org_apache_gluten_vectorized_BatchIterator_nativeClose(JNIEnv * env,
jobject /*obj*/, jlong executor_address)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::LocalExecutor * executor =
reinterpret_cast<local_engine::LocalExecutor *>(executor_address);
+ LOG_INFO(&Poco::Logger::get("jni"), "Finalize LocalExecutor {}",
reinterpret_cast<uintptr_t>(executor));
delete executor;
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
@@ -372,21 +343,6 @@ JNIEXPORT jobject
Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMet
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
-JNIEXPORT void
-Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetJavaTmpDir(JNIEnv
* /*env*/, jobject /*obj*/, jstring /*dir*/)
-{
-}
-
-JNIEXPORT void
-Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetBatchSize(JNIEnv
* /*env*/, jobject /*obj*/, jint /*batch_size*/)
-{
-}
-
-JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMetricsTime(
- JNIEnv * /*env*/, jobject /*obj*/, jboolean /*setMetricsTime*/)
-{
-}
-
JNIEXPORT jboolean
Java_org_apache_gluten_vectorized_CHColumnVector_nativeHasNull(JNIEnv * env,
jobject obj, jlong block_address, jint column_position)
{
@@ -603,52 +559,6 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIE
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHCoalesceOperator_createNativeOperator(JNIEnv
* env, jobject /*obj*/, jint buf_size)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::BlockCoalesceOperator * instance = new
local_engine::BlockCoalesceOperator(buf_size);
- return reinterpret_cast<jlong>(instance);
- LOCAL_ENGINE_JNI_METHOD_END(env, -1)
-}
-
-JNIEXPORT void
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeMergeBlock(
- JNIEnv * env, jobject /*obj*/, jlong instance_address, jlong block_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::BlockCoalesceOperator * instance =
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
- DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
- auto new_block = DB::Block(*block);
- instance->mergeBlock(new_block);
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT jboolean
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeIsFull(JNIEnv * env,
jobject /*obj*/, jlong instance_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::BlockCoalesceOperator * instance =
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
- bool full = instance->isFull();
- return full ? JNI_TRUE : JNI_FALSE;
- LOCAL_ENGINE_JNI_METHOD_END(env, false)
-}
-
-JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeRelease(JNIEnv *
env, jobject /*obj*/, jlong instance_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::BlockCoalesceOperator * instance =
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
- auto * block = instance->releaseBlock();
- Int64 address = reinterpret_cast<jlong>(block);
- return address;
- LOCAL_ENGINE_JNI_METHOD_END(env, -1)
-}
-
-JNIEXPORT void
Java_org_apache_gluten_vectorized_CHCoalesceOperator_nativeClose(JNIEnv * env,
jobject /*obj*/, jlong instance_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- local_engine::BlockCoalesceOperator * instance =
reinterpret_cast<local_engine::BlockCoalesceOperator *>(instance_address);
- delete instance;
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
// Splitter Jni Wrapper
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_nativeMake(
JNIEnv * env,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]