This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a9a07a878 [CH] Support celeborn on external sort shuffle
a9a07a878 is described below
commit a9a07a878e7f17a68669b7a6dacad854f7277c96
Author: LiuNeng <[email protected]>
AuthorDate: Thu Apr 25 11:24:37 2024 +0800
[CH] Support celeborn on external sort shuffle
What changes were proposed in this pull request?
now we can use external sort shuffle with celeborn, relate to #5279
new configuration
spark.gluten.sql.columnar.backend.ch.runtime_config.spill_memory_overhead,
type int, default value 50MB
The new configuration item is used to control the amount of additional
memory overhead when spilling. There are some stages where it is not allowed to
enter the spill again. Need to prevent this from happening by adding a certain
amount of spill_memory_overhead.
How was this patch tested?
unit tests
---
.../local-engine/Shuffle/CachedShuffleWriter.cpp | 18 +-
cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h | 1 +
cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 83 ++++++--
cpp-ch/local-engine/Shuffle/PartitionWriter.h | 38 +++-
...nClickHouseRSSColumnarSortShuffleAQESuite.scala | 208 +++++++++++++++++++++
5 files changed, 319 insertions(+), 29 deletions(-)
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
index ec16d553e..5a8629434 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
@@ -61,19 +61,13 @@ CachedShuffleWriter::CachedShuffleWriter(const String &
short_name, const SplitO
partitioner =
std::make_unique<RoundRobinSelectorBuilder>(options.partition_num,
use_external_sort_shuffle);
}
else if (short_name == "range")
- {
partitioner =
std::make_unique<RangeSelectorBuilder>(options.hash_exprs,
options.partition_num, use_external_sort_shuffle);
- }
else
- {
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "unsupported
splitter {}", short_name);
- }
Poco::StringTokenizer output_column_tokenizer(options_.out_exprs, ",");
for (const auto & iter : output_column_tokenizer)
- {
output_columns_indicies.push_back(std::stoi(iter));
- }
if (rss_pusher)
{
@@ -84,7 +78,13 @@ CachedShuffleWriter::CachedShuffleWriter(const String &
short_name, const SplitO
GetMethodID(env, celeborn_partition_pusher_class,
"pushPartitionData", "(I[BI)I");
CLEAN_JNIENV
auto celeborn_client = std::make_unique<CelebornClient>(rss_pusher,
celeborn_push_partition_data_method);
- partition_writer = std::make_unique<CelebornPartitionWriter>(this,
std::move(celeborn_client));
+ if (use_external_sort_shuffle)
+ {
+ partition_writer =
std::make_unique<ExternalSortCelebornPartitionWriter>(this,
std::move(celeborn_client));
+ sort_shuffle = true;
+ }
+ else
+ partition_writer = std::make_unique<CelebornPartitionWriter>(this,
std::move(celeborn_client));
}
else
{
@@ -94,9 +94,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String &
short_name, const SplitO
sort_shuffle = true;
}
else
- {
partition_writer = std::make_unique<LocalPartitionWriter>(this);
- }
}
split_result.partition_lengths.resize(options.partition_num, 0);
@@ -134,9 +132,7 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block)
{
output_header = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
- {
output_columns_indicies.push_back(i);
- }
}
else
{
diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
index 82480a09f..d1dd4ff2f 100644
--- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.h
@@ -36,6 +36,7 @@ public:
friend class LocalPartitionWriter;
friend class CelebornPartitionWriter;
friend class ExternalSortLocalPartitionWriter;
+ friend class ExternalSortCelebornPartitionWriter;
explicit CachedShuffleWriter(const String & short_name, const SplitOptions
& options, jobject rss_pusher = nullptr);
~CachedShuffleWriter() override = default;
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 507194729..e0b69316d 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -181,7 +181,7 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool
for_memory_spill, bool f
if (for_memory_spill && options->throw_if_memory_exceed)
{
// escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
- IgnoreMemoryTracker ignore(2 * 1024 * 1024);
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
ThreadFromGlobalPool thread(spill_to_file);
thread.join();
}
@@ -282,6 +282,11 @@ void LocalPartitionWriter::unsafeStop()
shuffle_writer->split_result.partition_lengths = offsets;
}
+void PartitionWriterSettings::loadFromContext(DB::ContextPtr context)
+{
+ spill_memory_overhead =
context->getConfigRef().getUInt64("spill_memory_overhead", 50 << 20);
+}
+
PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_)
: shuffle_writer(shuffle_writer_)
, options(&shuffle_writer->options)
@@ -294,6 +299,7 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter *
shuffle_writer_)
partition_block_buffer[partition_id] =
std::make_shared<ColumnsBuffer>(options->split_size);
partition_buffer[partition_id] = std::make_shared<Partition>();
}
+ settings.loadFromContext(SerializedPlanParser::global_context);
}
size_t PartitionWriter::evictPartitions(bool for_memory_spill, bool
flush_block_buffer)
@@ -358,7 +364,7 @@ void ExternalSortLocalPartitionWriter::write(const
PartitionInfo & info, DB::Blo
size_t ExternalSortLocalPartitionWriter::unsafeEvictPartitions(bool, bool)
{
// escape memory track
- IgnoreMemoryTracker ignore(128 * 1024 * 1024);
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
if (accumulated_blocks.empty())
return 0;
Stopwatch watch;
@@ -390,27 +396,33 @@ std::queue<Block>
ExternalSortLocalPartitionWriter::mergeDataInMemory()
return result;
}
-void ExternalSortLocalPartitionWriter::unsafeStop()
+ExternalSortLocalPartitionWriter::MergeContext
ExternalSortLocalPartitionWriter::prepareMerge()
{
- // escape memory track
- IgnoreMemoryTracker ignore(512 * 1024 * 1024);
- Stopwatch write_time_watch;
- // no data to write
- if (streams.empty() && accumulated_blocks.empty())
- return;
+ MergeContext context;
if (options->spill_firstly_before_stop)
unsafeEvictPartitions(false, false);
auto num_input = accumulated_blocks.empty() ? streams.size() :
streams.size() + 1;
std::unique_ptr<MergingSortedAlgorithm> algorithm =
std::make_unique<MergingSortedAlgorithm>(
sort_header, num_input, sort_description, max_merge_block_size, 0,
SortingQueueStrategy::Batch);
+ context.codec =
CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
+ auto sorted_memory_data = mergeDataInMemory();
+ context.merger =
std::make_unique<SortedPartitionDataMerger>(std::move(algorithm), streams,
sorted_memory_data, output_header);
+ return context;
+}
+void ExternalSortLocalPartitionWriter::unsafeStop()
+{
+ // escape memory track
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+ Stopwatch write_time_watch;
+ // no data to write
+ if (streams.empty() && accumulated_blocks.empty())
+ return;
+ auto context = prepareMerge();
WriteBufferFromFile output(options->data_file, options->io_buffer_size);
- auto codec =
DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method),
{});
- CompressedWriteBuffer compressed_output(output, codec,
shuffle_writer->options.io_buffer_size);
- local_engine::NativeWriter native_writer(compressed_output, output_header);
+ CompressedWriteBuffer compressed_output(output, context.codec,
shuffle_writer->options.io_buffer_size);
+ NativeWriter native_writer(compressed_output, output_header);
- auto sorted_memory_data = mergeDataInMemory();
- SortedPartitionDataMerger merger(std::move(algorithm), streams,
sorted_memory_data, output_header);
std::vector<UInt64>
partition_length(shuffle_writer->options.partition_num, 0);
size_t current_file_size = 0;
size_t current_partition_raw_size = 0;
@@ -427,9 +439,9 @@ void ExternalSortLocalPartitionWriter::unsafeStop()
current_partition_raw_size = 0;
}
};
- while (!merger.isFinished())
+ while (!context.merger->isFinished())
{
- auto result = merger.next();
+ auto result = context.merger->next();
if (result.empty)
break;
for (auto & item : result.blocks)
@@ -447,6 +459,43 @@ void ExternalSortLocalPartitionWriter::unsafeStop()
shuffle_writer->split_result.total_io_time +=
compressed_output.getWriteTime();
}
+void ExternalSortCelebornPartitionWriter::unsafeStop()
+{
+ // escape memory track
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
+ Stopwatch write_time_watch;
+ // no data to write
+ if (streams.empty() && accumulated_blocks.empty())
+ return;
+ auto context = prepareMerge();
+
+ WriteBufferFromOwnString output;
+ CompressedWriteBuffer compressed_output(output, context.codec,
shuffle_writer->options.io_buffer_size);
+ NativeWriter native_writer(compressed_output, output_header);
+ std::vector<UInt64>
partition_length(shuffle_writer->options.partition_num, 0);
+
+ while (!context.merger->isFinished())
+ {
+ auto result = context.merger->next();
+ if (result.empty)
+ break;
+ for (auto & item : result.blocks)
+ {
+ shuffle_writer->split_result.raw_partition_lengths[item.second] +=
native_writer.write(item.first);
+ compressed_output.sync();
+ partition_length[item.second] += output.count();
+ Stopwatch push_time;
+ celeborn_client->pushPartitionData(item.second,
output.str().data(), output.str().size());
+ shuffle_writer->split_result.total_io_time +=
push_time.elapsedNanoseconds();
+ output.restart();
+ }
+ }
+
+ shuffle_writer->split_result.partition_lengths = partition_length;
+ shuffle_writer->split_result.total_write_time +=
write_time_watch.elapsedNanoseconds();
+ shuffle_writer->split_result.total_compress_time +=
compressed_output.getCompressTime();
+ shuffle_writer->split_result.total_io_time +=
compressed_output.getWriteTime();
+}
CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter *
shuffleWriter, std::unique_ptr<CelebornClient> celeborn_client_)
: PartitionWriter(shuffleWriter),
celeborn_client(std::move(celeborn_client_))
{
@@ -512,7 +561,7 @@ size_t
CelebornPartitionWriter::unsafeEvictSinglePartition(bool for_memory_spill
if (for_memory_spill && options->throw_if_memory_exceed)
{
// escape memory track from current thread status; add untracked
memory limit for create thread object, avoid trigger memory spill again
- IgnoreMemoryTracker ignore(2 * 1024 * 1024);
+ IgnoreMemoryTracker ignore(settings.spill_memory_overhead);
ThreadFromGlobalPool thread(spill_to_celeborn);
thread.join();
}
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
index 9dc5fbe31..9c4e75db6 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
@@ -26,6 +26,11 @@
#include <jni/CelebornClient.h>
#include <Parser/SerializedPlanParser.h>
+namespace DB
+{
+class MergingSortedAlgorithm;
+}
+
namespace local_engine
{
struct PartitionSpillInfo
@@ -59,6 +64,13 @@ private:
size_t cached_bytes = 0;
};
+struct PartitionWriterSettings
+{
+ uint64_t spill_memory_overhead = 0;
+
+ void loadFromContext(DB::ContextPtr context);
+};
+
class CachedShuffleWriter;
using PartitionPtr = std::shared_ptr<Partition>;
class PartitionWriter : boost::noncopyable
@@ -89,6 +101,7 @@ protected:
CachedShuffleWriter * shuffle_writer;
const SplitOptions * options;
+ PartitionWriterSettings settings;
std::vector<ColumnsBufferPtr> partition_block_buffer;
std::vector<PartitionPtr> partition_buffer;
@@ -118,9 +131,17 @@ protected:
std::vector<SpillInfo> spill_infos;
};
+class SortedPartitionDataMerger;
+
class ExternalSortLocalPartitionWriter : public PartitionWriter
{
public:
+ struct MergeContext
+ {
+ CompressionCodecPtr codec;
+ std::unique_ptr<SortedPartitionDataMerger> merger;
+ };
+
explicit ExternalSortLocalPartitionWriter(CachedShuffleWriter *
shuffle_writer_) : PartitionWriter(shuffle_writer_)
{
max_merge_block_size = options->split_size;
@@ -135,10 +156,11 @@ public:
protected:
size_t unsafeEvictPartitions(bool for_memory_spill, bool
flush_block_buffer) override;
+ /// Prepare for data merging, spill the remaining memory dataļ¼and create a
merger object.
+ MergeContext prepareMerge();
void unsafeStop() override;
std::queue<DB::Block> mergeDataInMemory();
-private:
size_t max_sort_buffer_size = 1_GiB;
size_t max_merge_block_size = DB::DEFAULT_BLOCK_SIZE;
size_t current_accumulated_bytes = 0;
@@ -150,6 +172,20 @@ private:
std::vector<DB::TemporaryFileStream *> streams;
};
+class ExternalSortCelebornPartitionWriter : public
ExternalSortLocalPartitionWriter
+{
+public:
+ explicit ExternalSortCelebornPartitionWriter(CachedShuffleWriter *
shuffle_writer_, std::unique_ptr<CelebornClient> celeborn_client_)
+ : ExternalSortLocalPartitionWriter(shuffle_writer_),
celeborn_client(std::move(celeborn_client_))
+ {
+ }
+protected:
+ void unsafeStop() override;
+
+private:
+ std::unique_ptr<CelebornClient> celeborn_client;
+};
+
class CelebornPartitionWriter : public PartitionWriter
{
public:
diff --git
a/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
new file mode 100644
index 000000000..0072fe8c9
--- /dev/null
+++
b/gluten-celeborn/clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseRSSColumnarSortShuffleAQESuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.CoalescedPartitionSpec
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec}
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenClickHouseRSSColumnarSortShuffleAQESuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val tablesPath: String = basePath + "/tpch-data-ch"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String =
+ rootPath +
"../../../../../backends-clickhouse/src/test/resources/mergetree-queries-output"
+
+ override protected val parquetTableDataPath: String =
+ "../../../../../gluten-core/src/test/resources/tpch-data"
+
+ /** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(
+ "spark.shuffle.manager",
+ "org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.shuffle.service.enabled", "false")
+ .set("spark.celeborn.client.spark.shuffle.writer", "hash")
+ .set("spark.gluten.sql.columnar.backend.ch.forceSortShuffle", "true")
+ }
+
+ test("TPCH Q1") {
+ runTPCHQuery(1) {
+ df =>
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+
+ val colCustomShuffleReaderExecs =
collect(df.queryExecution.executedPlan) {
+ case csr: AQEShuffleReadExec => csr
+ }
+ assert(colCustomShuffleReaderExecs.size == 2)
+ val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0)
+ .partitionSpecs(0)
+ .asInstanceOf[CoalescedPartitionSpec]
+ assert(coalescedPartitionSpec0.startReducerIndex == 0)
+ assert(coalescedPartitionSpec0.endReducerIndex == 5)
+ val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1)
+ .partitionSpecs(0)
+ .asInstanceOf[CoalescedPartitionSpec]
+ assert(coalescedPartitionSpec1.startReducerIndex == 0)
+ assert(coalescedPartitionSpec1.endReducerIndex == 5)
+ }
+ }
+
+ test("TPCH Q2") {
+ runTPCHQuery(2) { df => }
+ }
+
+ test("TPCH Q3") {
+ runTPCHQuery(3) { df => }
+ }
+
+ test("TPCH Q4") {
+ runTPCHQuery(4) { df => }
+ }
+
+ test("TPCH Q5") {
+ runTPCHQuery(5) { df => }
+ }
+
+ test("TPCH Q6") {
+ runTPCHQuery(6) { df => }
+ }
+
+ test("TPCH Q7") {
+ runTPCHQuery(7) { df => }
+ }
+
+ test("TPCH Q8") {
+ runTPCHQuery(8) { df => }
+ }
+
+ test("TPCH Q9") {
+ runTPCHQuery(9) { df => }
+ }
+
+ test("TPCH Q10") {
+ runTPCHQuery(10) { df => }
+ }
+
+ test("TPCH Q11") {
+ runTPCHQuery(11) {
+ df =>
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case adaptive: AdaptiveSparkPlanExec => adaptive
+ }
+ assert(adaptiveSparkPlanExec.size == 2)
+ }
+ }
+
+ test("TPCH Q12") {
+ runTPCHQuery(12) { df => }
+ }
+
+ test("TPCH Q13") {
+ runTPCHQuery(13) { df => }
+ }
+
+ test("TPCH Q14") {
+ runTPCHQuery(14) { df => }
+ }
+
+ test("TPCH Q15") {
+ runTPCHQuery(15) {
+ df =>
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case adaptive: AdaptiveSparkPlanExec => adaptive
+ }
+ assert(adaptiveSparkPlanExec.size == 2)
+ }
+ }
+
+ test("TPCH Q16") {
+ runTPCHQuery(16, noFallBack = false) { df => }
+ }
+
+ test("TPCH Q17") {
+ runTPCHQuery(17) { df => }
+ }
+
+ test("TPCH Q18") {
+ runTPCHQuery(18) { df => }
+ }
+
+ test("TPCH Q19") {
+ runTPCHQuery(19) { df => }
+ }
+
+ test("TPCH Q20") {
+ runTPCHQuery(20) { df => }
+ }
+
+ test("TPCH Q21") {
+ runTPCHQuery(21, noFallBack = false) { df => }
+ }
+
+ test("TPCH Q22") {
+ runTPCHQuery(22) {
+ df =>
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val adaptiveSparkPlanExec =
collectWithSubqueries(df.queryExecution.executedPlan) {
+ case adaptive: AdaptiveSparkPlanExec => adaptive
+ }
+ assert(adaptiveSparkPlanExec.size == 3)
+ assert(adaptiveSparkPlanExec(1) == adaptiveSparkPlanExec(2))
+ }
+ }
+
+ test("fix partiton_id when spill_to_celeborn") {
+ import testImplicits._
+ withSQLConf(
+ SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "5",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10B",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "2",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key -> "1B"
+ ) {
+ val df = spark.sparkContext
+ .parallelize(
+ TestData(0) ::
+ TestData(0) ::
+ TestData(1) ::
+ TestData(1) ::
+ TestData(2) ::
+ TestData(2) :: Nil,
+ 3)
+ .toDF()
+ df.createOrReplaceTempView("t")
+ val res = spark.sql("select spark_partition_id(), id from t group by
id").collect()
+ assert(res.length == 3)
+ assert(res(0).getInt(0) == 0)
+ assert(res(0).getInt(1) == 0)
+ assert(res(1).getInt(0) == 1)
+ assert(res(1).getInt(1) == 1)
+ assert(res(2).getInt(0) == 2)
+ assert(res(2).getInt(1) == 2)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]