This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a5214e253 [CELEBORN-1906][CIP-14] Add CelebornInputStream to cppClient
a5214e253 is described below
commit a5214e25351667cb947b84de9045df4a3a9fb009
Author: HolyLow <[email protected]>
AuthorDate: Fri Mar 14 22:31:51 2025 +0800
[CELEBORN-1906][CIP-14] Add CelebornInputStream to cppClient
### What changes were proposed in this pull request?
This PR adds CelebornInputStream to cppClient.
### Why are the changes needed?
The CelebornInputStream is the readerClient's feeding stream.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation.
Closes #3151 from
HolyLow/issue/celeborn-1906-add-celeborn-input-stream-to-cppclient.
Authored-by: HolyLow <[email protected]>
Signed-off-by: mingji <[email protected]>
---
cpp/celeborn/client/CMakeLists.txt | 3 +-
cpp/celeborn/client/reader/CelebornInputStream.cpp | 214 +++++++++++++++++++++
cpp/celeborn/client/reader/CelebornInputStream.h | 80 ++++++++
3 files changed, 296 insertions(+), 1 deletion(-)
diff --git a/cpp/celeborn/client/CMakeLists.txt
b/cpp/celeborn/client/CMakeLists.txt
index eaa077e3a..f9edfe949 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -14,7 +14,8 @@
# limitations under the License.
add_library(
client
- reader/WorkerPartitionReader.cpp)
+ reader/WorkerPartitionReader.cpp
+ reader/CelebornInputStream.cpp)
target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
new file mode 100644
index 000000000..a3023d6f3
--- /dev/null
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/reader/CelebornInputStream.h"
+
+namespace celeborn {
+namespace client {
+CelebornInputStream::CelebornInputStream(
+ const std::string& shuffleKey,
+ const std::shared_ptr<const conf::CelebornConf>& conf,
+ const std::shared_ptr<network::TransportClientFactory>& clientFactory,
+ std::vector<std::shared_ptr<const protocol::PartitionLocation>>&&
locations,
+ const std::vector<int>& attempts,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex)
+ : shuffleKey_(shuffleKey),
+ conf_(conf),
+ clientFactory_(clientFactory),
+ locations_(std::move(locations)),
+ attempts_(attempts),
+ attemptNumber_(attemptNumber),
+ startMapIndex_(startMapIndex),
+ endMapIndex_(endMapIndex),
+ currLocationIndex_(0),
+ currBatchPos_(0),
+ currBatchSize_(0) {
+ moveToNextReader();
+}
+
+int CelebornInputStream::read(uint8_t* buffer, size_t offset, size_t len) {
+ CELEBORN_CHECK_NOT_NULL(buffer);
+ uint8_t* buf = buffer + offset;
+ if (len <= 0)
+ return 0;
+
+ size_t readBytes = 0;
+ while (readBytes < len) {
+ while (currBatchPos_ >= currBatchSize_) {
+ if (!fillBuffer()) {
+ return readBytes > 0 ? readBytes : -1;
+ }
+ }
+ size_t batchRemainingSize = currBatchSize_ - currBatchPos_;
+ size_t toReadBytes = std::min(len - readBytes, batchRemainingSize);
+ CELEBORN_CHECK_GE(currChunk_->remainingSize(), toReadBytes);
+ auto size = currChunk_->readToBuffer(&buf[readBytes], toReadBytes);
+ CELEBORN_CHECK_EQ(toReadBytes, size);
+ readBytes += toReadBytes;
+ currBatchPos_ += toReadBytes;
+ }
+ return readBytes;
+}
+
+bool CelebornInputStream::fillBuffer() {
+ if (!currChunk_) {
+ return false;
+ }
+
+ bool hasData = false;
+ while (currChunk_->remainingSize() > 0 || moveToNextChunk()) {
+ CELEBORN_CHECK_GE(currChunk_->remainingSize(), 4 * 4);
+ // TODO: in java this is UNSAFE and PLATFORM related. hand-crafted here,
+ // might not be safe.
+ int mapId = currChunk_->readLE<int>();
+ int attemptId = currChunk_->readLE<int>();
+ int batchId = currChunk_->readLE<int>();
+ int size = currChunk_->readLE<int>();
+ CELEBORN_CHECK_GE(currChunk_->remainingSize(), size);
+ CELEBORN_CHECK_LT(mapId, attempts_.size());
+
+ // TODO: compression is not supported yet!
+
+ if (attemptId == attempts_[mapId]) {
+ auto& batchRecord = getBatchRecord(mapId);
+ if (batchRecord.count(batchId) <= 0) {
+ batchRecord.insert(batchId);
+ currBatchSize_ = size;
+ currBatchPos_ = 0;
+ hasData = true;
+ break;
+ } else {
+ currChunk_->skip(size);
+ }
+ }
+ }
+
+ return hasData;
+}
+
+bool CelebornInputStream::moveToNextChunk() {
+ currChunk_.reset();
+
+ if (currReader_->hasNext()) {
+ currChunk_ = getNextChunk();
+ return true;
+ }
+ if (currLocationIndex_ < locations_.size()) {
+ moveToNextReader();
+ return currReader_ != nullptr;
+ }
+ cleanupReader();
+ return false;
+}
+
+std::unique_ptr<memory::ReadOnlyByteBuffer>
+CelebornInputStream::getNextChunk() {
+ // TODO: support the failure retrying, including excluding the failed
+ // location, open a reader to read from the location's peer.
+ auto chunk = currReader_->next();
+ verifyChunk(chunk);
+ return std::move(chunk);
+}
+
+void CelebornInputStream::verifyChunk(
+ const std::unique_ptr<memory::ReadOnlyByteBuffer>& chunk) {
+ auto data = chunk->clone();
+ while (data->remainingSize() > 0) {
+ CELEBORN_CHECK_GE(data->remainingSize(), 4 * 4);
+ // TODO: in java this is UNSAFE and PLATFORM related. hand-crafted here,
+ // might not be safe.
+ int mapId = data->readLE<int>();
+ int attemptId = data->readLE<int>();
+ int batchId = data->readLE<int>();
+ int size = data->readLE<int>();
+ CELEBORN_CHECK_GE(data->remainingSize(), size);
+ CELEBORN_CHECK_LT(mapId, attempts_.size());
+ data->skip(size);
+ }
+}
+
+void CelebornInputStream::moveToNextReader() {
+ cleanupReader();
+ auto location = nextReadableLocation();
+ if (!location) {
+ return;
+ }
+ currReader_ = createReaderWithRetry(*location);
+ currLocationIndex_++;
+ if (currReader_->hasNext()) {
+ currChunk_ = getNextChunk();
+ return;
+ }
+ moveToNextReader();
+}
+
+std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
+ const protocol::PartitionLocation& location) {
+ // TODO: support retrying when createReader failed. Maybe switch to peer
+ // location?
+ return createReader(location);
+}
+
+std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
+ const protocol::PartitionLocation& location) {
+ switch (location.storageInfo->type) {
+ case protocol::StorageInfo::HDD:
+ case protocol::StorageInfo::SSD: {
+ // TODO: support localPartitionReader...
+ return WorkerPartitionReader::create(
+ conf_,
+ shuffleKey_,
+ location,
+ startMapIndex_,
+ endMapIndex_,
+ clientFactory_.get());
+ }
+ case protocol::StorageInfo::HDFS:
+ default:
+ // TODO: support DfsPartitionReader...
+ CELEBORN_FAIL(
+ "unsupported protocol::StorageInfo type " +
+ std::to_string(location.storageInfo->type));
+ }
+}
+
+std::shared_ptr<const protocol::PartitionLocation>
+CelebornInputStream::nextReadableLocation() {
+ if (currLocationIndex_ >= locations_.size()) {
+ return nullptr;
+ }
+ return locations_[currLocationIndex_];
+ // TODO: support skipLocation functionality...
+ // TODO: the currLocationIndex_ management is a mess. might be
+ // managed all within this function?
+}
+
+std::unordered_set<int>& CelebornInputStream::getBatchRecord(int mapId) {
+ batchRecords_.resize(mapId + 1);
+ if (!batchRecords_[mapId]) {
+ batchRecords_[mapId] = std::make_unique<std::unordered_set<int>>();
+ }
+ return *batchRecords_[mapId];
+}
+
+void CelebornInputStream::cleanupReader() {
+ currReader_ = nullptr;
+}
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h
b/cpp/celeborn/client/reader/CelebornInputStream.h
new file mode 100644
index 000000000..af143321c
--- /dev/null
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/client/reader/WorkerPartitionReader.h"
+#include "celeborn/conf/CelebornConf.h"
+
+namespace celeborn {
+namespace client {
+class CelebornInputStream {
+ public:
+ CelebornInputStream(
+ const std::string& shuffleKey,
+ const std::shared_ptr<const conf::CelebornConf>& conf,
+ const std::shared_ptr<network::TransportClientFactory>& clientFactory,
+ std::vector<std::shared_ptr<const protocol::PartitionLocation>>&&
+ locations,
+ const std::vector<int>& attempts,
+ int attemptNumber,
+ int startMapIndex,
+ int endMapIndex);
+
+ int read(uint8_t* buffer, size_t offset, size_t len);
+
+ private:
+ bool fillBuffer();
+
+ bool moveToNextChunk();
+
+ std::unique_ptr<memory::ReadOnlyByteBuffer> getNextChunk();
+
+ void verifyChunk(const std::unique_ptr<memory::ReadOnlyByteBuffer>& chunk);
+
+ void moveToNextReader();
+
+ std::shared_ptr<PartitionReader> createReaderWithRetry(
+ const protocol::PartitionLocation& location);
+
+ std::shared_ptr<PartitionReader> createReader(
+ const protocol::PartitionLocation& location);
+
+ std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();
+
+ std::unordered_set<int>& getBatchRecord(int mapId);
+
+ void cleanupReader();
+
+ std::string shuffleKey_;
+ std::shared_ptr<const conf::CelebornConf> conf_;
+ std::shared_ptr<network::TransportClientFactory> clientFactory_;
+ std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations_;
+ std::vector<int> attempts_;
+ int attemptNumber_;
+ int startMapIndex_;
+ int endMapIndex_;
+
+ int currLocationIndex_;
+ std::unique_ptr<memory::ReadOnlyByteBuffer> currChunk_;
+ size_t currBatchPos_;
+ size_t currBatchSize_;
+ std::shared_ptr<PartitionReader> currReader_;
+ std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;
+};
+} // namespace client
+} // namespace celeborn