This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 60e20a3afea [fix](pipeline_x) Crc32HashPartitioner should use
ShuffleChannelIds (#34147)
60e20a3afea is described below
commit 60e20a3afea42082f707aecf9d81ef2653f2b203
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Apr 26 14:33:03 2024 +0800
[fix](pipeline_x) Crc32HashPartitioner should use ShuffleChannelIds (#34147)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 8 ++++----
be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 4 ++--
be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 4 ++--
.../pipeline_x/local_exchange/local_exchange_sink_operator.h | 4 ++--
be/src/vec/runtime/partitioner.cpp | 1 -
5 files changed, 10 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 79a6ee0e748..84381c6a8af 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -221,8 +221,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
- _partitioner.reset(
- new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size()));
+ _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+ channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
@@ -269,8 +269,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
- _partitioner.reset(
- new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
+ _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+ _partition_count));
_partition_function.reset(new
HashPartitionFunction(_partitioner.get()));
scale_writer_partitioning_exchanger.reset(new
vectorized::ScaleWriterPartitioningExchanger<
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 5bdc5278ffc..3702c2e1a6b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -24,16 +24,16 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
-#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
// LocalExchangeChannelIds
#include "pipeline/pipeline_x/operator.h"
#include "vec/runtime/partitioner.h"
+#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
namespace doris {
class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
class PartitionedHashJoinProbeOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 3f29e3093b6..68c6b970163 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -24,9 +24,9 @@
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/join_build_sink_operator.h"
-#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
// LocalExchangeChannelIds
#include "pipeline/pipeline_x/operator.h"
#include "vec/runtime/partitioner.h"
+#include "vec/sink/vdata_stream_sender.h" // ShuffleChannelIds
namespace doris {
class ExecNode;
@@ -34,7 +34,7 @@ class RuntimeState;
namespace pipeline {
-using PartitionerType =
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>;
class PartitionedHashJoinSinkOperatorX;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index b3ecf29736f..db6662a221a 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -114,8 +114,8 @@ public:
_shuffle_idx_to_instance_idx[i] = {i, i};
}
}
- _partitioner.reset(
- new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
+ _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+ _num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
_partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
diff --git a/be/src/vec/runtime/partitioner.cpp
b/be/src/vec/runtime/partitioner.cpp
index db40610723c..fadf6d73b95 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -103,6 +103,5 @@ template class Partitioner<size_t, ShuffleChannelIds>;
template class XXHashPartitioner<ShuffleChannelIds>;
template class Partitioner<uint32_t, ShuffleChannelIds>;
template class Crc32HashPartitioner<ShuffleChannelIds>;
-template class Crc32HashPartitioner<pipeline::LocalExchangeChannelIds>;
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]