This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a533a062d [VL] Remove unused code for sort based shuffle (#5826)
a533a062d is described below

commit a533a062dd788c4be934d582e32529b41e1ffe78
Author: Yang Zhang <[email protected]>
AuthorDate: Wed May 22 10:21:51 2024 +0800

    [VL] Remove unused code for sort based shuffle (#5826)
---
 cpp/core/CMakeLists.txt                          |  1 -
 cpp/core/jni/JniWrapper.cc                       | 13 ----------
 cpp/core/shuffle/Options.h                       |  2 +-
 cpp/core/shuffle/rss/RemotePartitionWriter.cc    | 20 ---------------
 cpp/core/shuffle/rss/RemotePartitionWriter.h     | 32 ------------------------
 cpp/core/shuffle/rss/RssPartitionWriter.h        |  6 ++---
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc |  6 +----
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h  | 12 ++-------
 8 files changed, 7 insertions(+), 85 deletions(-)

diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt
index e2d312aba..dc9ce3435 100644
--- a/cpp/core/CMakeLists.txt
+++ b/cpp/core/CMakeLists.txt
@@ -191,7 +191,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
         shuffle/Partitioner.cc
         shuffle/Partitioning.cc
         shuffle/Payload.cc
-        shuffle/rss/RemotePartitionWriter.cc
         shuffle/rss/RssPartitionWriter.cc
         shuffle/RoundRobinPartitioner.cc
         shuffle/ShuffleMemoryPool.cc
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 6a1926317..b1edfbd01 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -850,19 +850,6 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .startPartitionId = startPartitionId,
   };
 
-  jclass cls = env->FindClass("java/lang/Thread");
-  jmethodID mid = env->GetStaticMethodID(cls, "currentThread", 
"()Ljava/lang/Thread;");
-  jobject thread = env->CallStaticObjectMethod(cls, mid);
-  checkException(env);
-  if (thread == NULL) {
-    LOG(WARNING) << "Thread.currentThread() return NULL";
-  } else {
-    jmethodID midGetid = getMethodIdOrError(env, cls, "getId", "()J");
-    jlong sid = env->CallLongMethod(thread, midGetid);
-    checkException(env);
-    shuffleWriterOptions.threadId = (int64_t)sid;
-  }
-
   auto partitionWriterOptions = PartitionWriterOptions{
       .mergeBufferSize = mergeBufferSize,
       .mergeThreshold = mergeThreshold,
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 4317ed631..4828c7c82 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -26,7 +26,7 @@ namespace gluten {
 
 static constexpr int16_t kDefaultBatchSize = 4096;
 static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
-static constexpr int64_t kDefaultSortBufferThreshold = 64000000000;
+static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
 static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
 static constexpr int32_t kDefaultNumSubDirs = 64;
 static constexpr int32_t kDefaultCompressionThreshold = 100;
diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.cc 
b/cpp/core/shuffle/rss/RemotePartitionWriter.cc
deleted file mode 100644
index 9993956b6..000000000
--- a/cpp/core/shuffle/rss/RemotePartitionWriter.cc
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "RemotePartitionWriter.h"
-
-namespace gluten {} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RemotePartitionWriter.h 
b/cpp/core/shuffle/rss/RemotePartitionWriter.h
deleted file mode 100644
index 477166635..000000000
--- a/cpp/core/shuffle/rss/RemotePartitionWriter.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "shuffle/PartitionWriter.h"
-
-#include <arrow/memory_pool.h>
-
-namespace gluten {
-
-class RemotePartitionWriter : public PartitionWriter {
- public:
-  explicit RemotePartitionWriter(uint32_t numPartitions, 
PartitionWriterOptions options, arrow::MemoryPool* pool)
-      : PartitionWriter(numPartitions, std::move(options), pool) {}
-};
-
-} // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h 
b/cpp/core/shuffle/rss/RssPartitionWriter.h
index b8cc1551c..d993aa9ea 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -20,20 +20,20 @@
 #include <arrow/io/api.h>
 #include <arrow/memory_pool.h>
 
-#include "shuffle/rss/RemotePartitionWriter.h"
+#include "shuffle/PartitionWriter.h"
 #include "shuffle/rss/RssClient.h"
 #include "utils/macros.h"
 
 namespace gluten {
 
-class RssPartitionWriter final : public RemotePartitionWriter {
+class RssPartitionWriter final : public PartitionWriter {
  public:
   RssPartitionWriter(
       uint32_t numPartitions,
       PartitionWriterOptions options,
       arrow::MemoryPool* pool,
       std::shared_ptr<RssClient> rssClient)
-      : RemotePartitionWriter(numPartitions, std::move(options), pool), 
rssClient_(rssClient) {
+      : PartitionWriter(numPartitions, std::move(options), pool), 
rssClient_(rssClient) {
     init();
   }
 
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index 2a6bca8c0..85fccf716 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -52,7 +52,6 @@ arrow::Status VeloxSortBasedShuffleWriter::init() {
       partitioner_, Partitioner::make(options_.partitioning, numPartitions_, 
options_.startPartitionId));
   DLOG(INFO) << "Create partitioning type: " << 
std::to_string(options_.partitioning);
 
-  partition2RowCount_.resize(numPartitions_);
   rowVectorIndexMap_.reserve(numPartitions_);
   for (auto pid = 0; pid < numPartitions_; ++pid) {
     rowVectorIndexMap_[pid].reserve(options_.bufferSize);
@@ -68,7 +67,6 @@ arrow::Status 
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr
   if (currentInputColumnBytes_ > memLimit) {
     for (auto pid = 0; pid < numPartitions(); ++pid) {
       RETURN_NOT_OK(evictRowVector(pid));
-      partition2RowCount_[pid] = 0;
     }
     batches_.clear();
     currentInputColumnBytes_ = 0;
@@ -77,7 +75,7 @@ arrow::Status 
VeloxSortBasedShuffleWriter::doSort(facebook::velox::RowVectorPtr
   return arrow::Status::OK();
 }
 
-arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t 
memLimit) {
+arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t 
/* memLimit */) {
   if (options_.partitioning == Partitioning::kSingle) {
     auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
     VELOX_CHECK_NOT_NULL(veloxColumnBatch);
@@ -199,7 +197,6 @@ arrow::Status 
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
 arrow::Status VeloxSortBasedShuffleWriter::stop() {
   for (auto pid = 0; pid < numPartitions(); ++pid) {
     RETURN_NOT_OK(evictRowVector(pid));
-    partition2RowCount_[pid] = 0;
   }
   batches_.clear();
   currentInputColumnBytes_ = 0;
@@ -236,7 +233,6 @@ arrow::Status 
VeloxSortBasedShuffleWriter::reclaimFixedSize(int64_t size, int64_
   if (sortState_ == SortState::kSortInit) {
     for (auto pid = 0; pid < numPartitions(); ++pid) {
       RETURN_NOT_OK(evictRowVector(pid));
-      partition2RowCount_[pid] = 0;
     }
     batches_.clear();
     *actual = currentInputColumnBytes_;
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index 417d5e926..bb50021fe 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -49,7 +49,7 @@ namespace gluten {
 
 enum SortState { kSortInit, kSort, kSortStop };
 
-class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter {
+class VeloxSortBasedShuffleWriter final : public VeloxShuffleWriter {
  public:
   static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
       uint32_t numPartitions,
@@ -81,7 +81,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter 
{
 
   void setSortState(SortState state);
 
-  arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t memLimit);
+  arrow::Status doSort(facebook::velox::RowVectorPtr rv, int64_t /* memLimit 
*/);
 
   arrow::Status evictBatch(uint32_t partitionId);
 
@@ -92,12 +92,6 @@ class VeloxSortBasedShuffleWriter : public 
VeloxShuffleWriter {
   std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
   std::unique_ptr<BufferOutputStream> bufferOutputStream_;
 
-  // Partition ID -> Row Count
-  // subscript: Partition ID
-  // value: How many rows does this partition have in the current input 
RowVector
-  // Updated for each input RowVector.
-  std::vector<uint32_t> partition2RowCount_;
-
   std::unique_ptr<facebook::velox::serializer::presto::PrestoVectorSerde> 
serde_ =
       
std::make_unique<facebook::velox::serializer::presto::PrestoVectorSerde>();
 
@@ -105,8 +99,6 @@ class VeloxSortBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   std::unordered_map<int32_t, std::vector<int64_t>> rowVectorIndexMap_;
 
-  std::unordered_map<int32_t, std::vector<int64_t>> rowVectorPartitionMap_;
-
   uint32_t currentInputColumnBytes_ = 0;
 
   SortState sortState_{kSortInit};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to