This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a533a062d [VL] Remove unused code for sort based shuffle (#5826)
a533a062d is described below
commit a533a062dd788c4be934d582e32529b41e1ffe78
Author: Yang Zhang <[email protected]>
AuthorDate: Wed May 22 10:21:51 2024 +0800
[VL] Remove unused code for sort based shuffle (#5826)
---
cpp/core/CMakeLists.txt | 1 -
cpp/core/jni/JniWrapper.cc | 13 ----------
cpp/core/shuffle/Options.h | 2 +-
cpp/core/shuffle/rss/RemotePartitionWriter.cc | 20 ---------------
cpp/core/shuffle/rss/RemotePartitionWriter.h | 32 ------------------------
cpp/core/shuffle/rss/RssPartitionWriter.h | 6 ++---
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 6 +----
cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 12 ++-------
8 files changed, 7 insertions(+), 85 deletions(-)
diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index e2d312aba..dc9ce3435 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Partitioner.cc
shuffle/Partitioning.cc
shuffle/Payload.cc
- shuffle/rss/RemotePartitionWriter.cc
shuffle/rss/RssPartitionWriter.cc
shuffle/RoundRobinPartitioner.cc
shuffle/ShuffleMemoryPool.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 6a1926317..b1edfbd01 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -850,19 +850,6 @@ JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
.startPartitionId = startPartitionId,
};
- jclass cls = env->FindClass("java/lang/Thread");
- jmethodID mid = env->GetStaticMethodID(cls, "currentThread",
"()Ljava/lang/Thread;");
- jobject thread = env->CallStaticObjectMethod(cls, mid);
- checkException(env);
- if (thread == NULL) {
- LOG(WARNING) << "Thread.currentThread() return NULL";
- } else {
- jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
- jlong sid = env->CallLongMethod(thread, midGetid);
- checkException(env);
- shuffleWriterOptions.threadId = (int64_t)sid;
- }
-
auto partitionWriterOptions = PartitionWriterOptions{
.mergeBufferSize = mergeBufferSize,
.mergeThreshold = mergeThreshold,
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 4317ed631..4828c7c82 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -26,7 +26,7 @@ namespace gluten {
static constexpr int16_t kDefaultBatchSize = 4096;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
-static constexpr int64_t kDefaultSortBufferThreshold = 64000000000;
+static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.cc
b/cpp/core/shuffle/rss/RemotePartitionWriter.cc
deleted file mode 100644
index 9993956b6..000000000
--- a/cpp/core/shuffle/rss/RemotePartitionWriter.cc
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RemotePartitionWriter.h"
-
-namespace gluten {} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.h
b/cpp/core/shuffle/rss/RemotePartitionWriter.h
deleted file mode 100644
index 477166635..000000000
--- a/cpp/core/shuffle/rss/RemotePartitionWriter.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "shuffle/PartitionWriter.h"
-
-#include <arrow/memory_pool.h>
-
-namespace gluten {
-
-class RemotePartitionWriter : public PartitionWriter {
- public:
- explicit RemotePartitionWriter(uint32_t numPartitions,
PartitionWriterOptions options, arrow::MemoryPool* pool)
- : PartitionWriter(numPartitions, std::move(options), pool) {}
-};
-
-} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index b8cc1551c..d993aa9ea 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -20,20 +20,20 @@
#include <arrow/io/api.h>
#include <arrow/memory_pool.h>
-#include "shuffle/rss/RemotePartitionWriter.h"
+#include "shuffle/PartitionWriter.h"
#include "shuffle/rss/RssClient.h"
#include "utils/macros.h"
namespace gluten {
-class RssPartitionWriter final : public RemotePartitionWriter {
+class RssPartitionWriter final : public PartitionWriter {
public:
RssPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
std::shared_ptr<RssClient> rssClient)
- : RemotePartitionWriter(numPartitions, std::move(options), pool),
rssClient_(rssClient) {
+ : PartitionWriter(numPartitions, std::move(options), pool),
rssClient_(rssClient) {
init();
}
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index 2a6bca8c0..85fccf716 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -52,7 +52,6 @@ arrow::Status VeloxSortBasedShuffleWriter::init() {
partitioner_, Partitioner::make(options_.partitioning, numPartitions_,
options_.startPartitionId));
DLOG(INFO) << "Create partitioning type: " <<
std::to_string(options_.partitioning);
- partition2RowCount_.resize(numPartitions_);
rowVectorIndexMap_.reserve(numPartitions_);
for (auto pid = 0; pid < numPartitions_; ++pid) {
rowVectorIndexMap_[pid].reserve(options_.bufferSize);
@@ -68,7 +67,6 @@ arrow::Status
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr
if (currentInputColumnBytes_ > memLimit) {
for (auto pid = 0; pid < numPartitions(); ++pid) {
RETURN_NOT_OK(evictRowVector(pid));
- partition2RowCount_[pid] = 0;
}
batches_.clear();
currentInputColumnBytes_ = 0;
@@ -77,7 +75,7 @@ arrow::Status
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr
return arrow::Status::OK();
}
-arrow::Status
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t
memLimit) {
+arrow::Status
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t
/* memLimit */) {
if (options_.partitioning == Partitioning::kSingle) {
auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -199,7 +197,6 @@ arrow::Status
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
arrow::Status VeloxSortBasedShuffleWriter::stop() {
for (auto pid = 0; pid < numPartitions(); ++pid) {
RETURN_NOT_OK(evictRowVector(pid));
- partition2RowCount_[pid] = 0;
}
batches_.clear();
currentInputColumnBytes_ = 0;
@@ -236,7 +233,6 @@ arrow::Status
VeloxSortBasedShuffleWriter::reclaimFixedSize(int64_t size, int64_
if (sortState_ == SortState::kSortInit) {
for (auto pid = 0; pid < numPartitions(); ++pid) {
RETURN_NOT_OK(evictRowVector(pid));
- partition2RowCount_[pid] = 0;
}
batches_.clear();
*actual = currentInputColumnBytes_;
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index 417d5e926..bb50021fe 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -49,7 +49,7 @@ namespace gluten {
enum SortState { kSortInit, kSort, kSortStop };
-class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter {
+class VeloxSortBasedShuffleWriter final : public VeloxShuffleWriter {
public:
static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
uint32_t numPartitions,
@@ -81,7 +81,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter
{
void setSortState(SortState state);
- arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
+ arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t /* memLimit
*/);
arrow::Status evictBatch(uint32_t partitionId);
@@ -92,12 +92,6 @@ class VeloxSortBasedShuffleWriter : public
VeloxShuffleWriter {
std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
std::unique_ptr<BufferOutputStream> bufferOutputStream_;
- // Partition ID -> Row Count
- // subscript: Partition ID
- // value: How many rows does this partition have in the current input
RowVector
- // Updated for each input RowVector.
- std::vector<uint32_t> partition2RowCount_;
-
std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde>
serde_ =
std::make_unique<facebook::velox::serializer::presto::PrestoVectorSerde>();
@@ -105,8 +99,6 @@ class VeloxSortBasedShuffleWriter : public
VeloxShuffleWriter {
std::unordered_map<int32_t, std::vector<int64_t>> rowVectorIndexMap_;
- std::unordered_map<int32_t, std::vector<int64_t>> rowVectorPartitionMap_;
-
uint32_t currentInputColumnBytes_ = 0;
SortState sortState_{kSortInit};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]