This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a5214e253 [CELEBORN-1906][CIP-14] Add CelebornInputStream to cppClient
a5214e253 is described below

commit a5214e25351667cb947b84de9045df4a3a9fb009
Author: HolyLow <[email protected]>
AuthorDate: Fri Mar 14 22:31:51 2025 +0800

    [CELEBORN-1906][CIP-14] Add CelebornInputStream to cppClient
    
    ### What changes were proposed in this pull request?
    This PR adds CelebornInputStream to cppClient.
    
    ### Why are the changes needed?
    The CelebornInputStream is the readerClient's feeding stream.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Compilation.
    
    Closes #3151 from 
HolyLow/issue/celeborn-1906-add-celeborn-input-stream-to-cppclient.
    
    Authored-by: HolyLow <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 cpp/celeborn/client/CMakeLists.txt                 |   3 +-
 cpp/celeborn/client/reader/CelebornInputStream.cpp | 214 +++++++++++++++++++++
 cpp/celeborn/client/reader/CelebornInputStream.h   |  80 ++++++++
 3 files changed, 296 insertions(+), 1 deletion(-)

diff --git a/cpp/celeborn/client/CMakeLists.txt 
b/cpp/celeborn/client/CMakeLists.txt
index eaa077e3a..f9edfe949 100644
--- a/cpp/celeborn/client/CMakeLists.txt
+++ b/cpp/celeborn/client/CMakeLists.txt
@@ -14,7 +14,8 @@
 # limitations under the License.
 add_library(
         client
-        reader/WorkerPartitionReader.cpp)
+        reader/WorkerPartitionReader.cpp
+        reader/CelebornInputStream.cpp)
 
 target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})
 
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.cpp 
b/cpp/celeborn/client/reader/CelebornInputStream.cpp
new file mode 100644
index 000000000..a3023d6f3
--- /dev/null
+++ b/cpp/celeborn/client/reader/CelebornInputStream.cpp
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "celeborn/client/reader/CelebornInputStream.h"
+
+namespace celeborn {
+namespace client {
+CelebornInputStream::CelebornInputStream(
+    const std::string& shuffleKey,
+    const std::shared_ptr<const conf::CelebornConf>& conf,
+    const std::shared_ptr<network::TransportClientFactory>& clientFactory,
+    std::vector<std::shared_ptr<const protocol::PartitionLocation>>&& 
locations,
+    const std::vector<int>& attempts,
+    int attemptNumber,
+    int startMapIndex,
+    int endMapIndex)
+    : shuffleKey_(shuffleKey),
+      conf_(conf),
+      clientFactory_(clientFactory),
+      locations_(std::move(locations)),
+      attempts_(attempts),
+      attemptNumber_(attemptNumber),
+      startMapIndex_(startMapIndex),
+      endMapIndex_(endMapIndex),
+      currLocationIndex_(0),
+      currBatchPos_(0),
+      currBatchSize_(0) {
+  moveToNextReader();
+}
+
+int CelebornInputStream::read(uint8_t* buffer, size_t offset, size_t len) {
+  CELEBORN_CHECK_NOT_NULL(buffer);
+  uint8_t* buf = buffer + offset;
+  if (len <= 0)
+    return 0;
+
+  size_t readBytes = 0;
+  while (readBytes < len) {
+    while (currBatchPos_ >= currBatchSize_) {
+      if (!fillBuffer()) {
+        return readBytes > 0 ? readBytes : -1;
+      }
+    }
+    size_t batchRemainingSize = currBatchSize_ - currBatchPos_;
+    size_t toReadBytes = std::min(len - readBytes, batchRemainingSize);
+    CELEBORN_CHECK_GE(currChunk_->remainingSize(), toReadBytes);
+    auto size = currChunk_->readToBuffer(&buf[readBytes], toReadBytes);
+    CELEBORN_CHECK_EQ(toReadBytes, size);
+    readBytes += toReadBytes;
+    currBatchPos_ += toReadBytes;
+  }
+  return readBytes;
+}
+
+bool CelebornInputStream::fillBuffer() {
+  if (!currChunk_) {
+    return false;
+  }
+
+  bool hasData = false;
+  while (currChunk_->remainingSize() > 0 || moveToNextChunk()) {
+    CELEBORN_CHECK_GE(currChunk_->remainingSize(), 4 * 4);
+    // TODO: in java this is UNSAFE and PLATFORM related. hand-crafted here,
+    // might not be safe.
+    int mapId = currChunk_->readLE<int>();
+    int attemptId = currChunk_->readLE<int>();
+    int batchId = currChunk_->readLE<int>();
+    int size = currChunk_->readLE<int>();
+    CELEBORN_CHECK_GE(currChunk_->remainingSize(), size);
+    CELEBORN_CHECK_LT(mapId, attempts_.size());
+
+    // TODO: compression is not supported yet!
+
+    if (attemptId == attempts_[mapId]) {
+      auto& batchRecord = getBatchRecord(mapId);
+      if (batchRecord.count(batchId) <= 0) {
+        batchRecord.insert(batchId);
+        currBatchSize_ = size;
+        currBatchPos_ = 0;
+        hasData = true;
+        break;
+      } else {
+        currChunk_->skip(size);
+      }
+    }
+  }
+
+  return hasData;
+}
+
+bool CelebornInputStream::moveToNextChunk() {
+  currChunk_.reset();
+
+  if (currReader_->hasNext()) {
+    currChunk_ = getNextChunk();
+    return true;
+  }
+  if (currLocationIndex_ < locations_.size()) {
+    moveToNextReader();
+    return currReader_ != nullptr;
+  }
+  cleanupReader();
+  return false;
+}
+
+std::unique_ptr<memory::ReadOnlyByteBuffer>
+CelebornInputStream::getNextChunk() {
+  // TODO: support the failure retrying, including excluding the failed
+  // location, open a reader to read from the location's peer.
+  auto chunk = currReader_->next();
+  verifyChunk(chunk);
+  return std::move(chunk);
+}
+
+void CelebornInputStream::verifyChunk(
+    const std::unique_ptr<memory::ReadOnlyByteBuffer>& chunk) {
+  auto data = chunk->clone();
+  while (data->remainingSize() > 0) {
+    CELEBORN_CHECK_GE(data->remainingSize(), 4 * 4);
+    // TODO: in java this is UNSAFE and PLATFORM related. hand-crafted here,
+    // might not be safe.
+    int mapId = data->readLE<int>();
+    int attemptId = data->readLE<int>();
+    int batchId = data->readLE<int>();
+    int size = data->readLE<int>();
+    CELEBORN_CHECK_GE(data->remainingSize(), size);
+    CELEBORN_CHECK_LT(mapId, attempts_.size());
+    data->skip(size);
+  }
+}
+
+void CelebornInputStream::moveToNextReader() {
+  cleanupReader();
+  auto location = nextReadableLocation();
+  if (!location) {
+    return;
+  }
+  currReader_ = createReaderWithRetry(*location);
+  currLocationIndex_++;
+  if (currReader_->hasNext()) {
+    currChunk_ = getNextChunk();
+    return;
+  }
+  moveToNextReader();
+}
+
+std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
+    const protocol::PartitionLocation& location) {
+  // TODO: support retrying when createReader failed. Maybe switch to peer
+  // location?
+  return createReader(location);
+}
+
+std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
+    const protocol::PartitionLocation& location) {
+  switch (location.storageInfo->type) {
+    case protocol::StorageInfo::HDD:
+    case protocol::StorageInfo::SSD: {
+      // TODO: support localPartitionReader...
+      return WorkerPartitionReader::create(
+          conf_,
+          shuffleKey_,
+          location,
+          startMapIndex_,
+          endMapIndex_,
+          clientFactory_.get());
+    }
+    case protocol::StorageInfo::HDFS:
+    default:
+      // TODO: support DfsPartitionReader...
+      CELEBORN_FAIL(
+          "unsupported protocol::StorageInfo type " +
+          std::to_string(location.storageInfo->type));
+  }
+}
+
+std::shared_ptr<const protocol::PartitionLocation>
+CelebornInputStream::nextReadableLocation() {
+  if (currLocationIndex_ >= locations_.size()) {
+    return nullptr;
+  }
+  return locations_[currLocationIndex_];
+  // TODO: support skipLocation functionality...
+  // TODO: the currLocationIndex_ management is a mess. might be
+  // managed all within this function?
+}
+
+std::unordered_set<int>& CelebornInputStream::getBatchRecord(int mapId) {
+  batchRecords_.resize(mapId + 1);
+  if (!batchRecords_[mapId]) {
+    batchRecords_[mapId] = std::make_unique<std::unordered_set<int>>();
+  }
+  return *batchRecords_[mapId];
+}
+
+void CelebornInputStream::cleanupReader() {
+  currReader_ = nullptr;
+}
+} // namespace client
+} // namespace celeborn
diff --git a/cpp/celeborn/client/reader/CelebornInputStream.h 
b/cpp/celeborn/client/reader/CelebornInputStream.h
new file mode 100644
index 000000000..af143321c
--- /dev/null
+++ b/cpp/celeborn/client/reader/CelebornInputStream.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "celeborn/client/reader/WorkerPartitionReader.h"
+#include "celeborn/conf/CelebornConf.h"
+
+namespace celeborn {
+namespace client {
+class CelebornInputStream {
+ public:
+  CelebornInputStream(
+      const std::string& shuffleKey,
+      const std::shared_ptr<const conf::CelebornConf>& conf,
+      const std::shared_ptr<network::TransportClientFactory>& clientFactory,
+      std::vector<std::shared_ptr<const protocol::PartitionLocation>>&&
+          locations,
+      const std::vector<int>& attempts,
+      int attemptNumber,
+      int startMapIndex,
+      int endMapIndex);
+
+  int read(uint8_t* buffer, size_t offset, size_t len);
+
+ private:
+  bool fillBuffer();
+
+  bool moveToNextChunk();
+
+  std::unique_ptr<memory::ReadOnlyByteBuffer> getNextChunk();
+
+  void verifyChunk(const std::unique_ptr<memory::ReadOnlyByteBuffer>& chunk);
+
+  void moveToNextReader();
+
+  std::shared_ptr<PartitionReader> createReaderWithRetry(
+      const protocol::PartitionLocation& location);
+
+  std::shared_ptr<PartitionReader> createReader(
+      const protocol::PartitionLocation& location);
+
+  std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();
+
+  std::unordered_set<int>& getBatchRecord(int mapId);
+
+  void cleanupReader();
+
+  std::string shuffleKey_;
+  std::shared_ptr<const conf::CelebornConf> conf_;
+  std::shared_ptr<network::TransportClientFactory> clientFactory_;
+  std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations_;
+  std::vector<int> attempts_;
+  int attemptNumber_;
+  int startMapIndex_;
+  int endMapIndex_;
+
+  int currLocationIndex_;
+  std::unique_ptr<memory::ReadOnlyByteBuffer> currChunk_;
+  size_t currBatchPos_;
+  size_t currBatchSize_;
+  std::shared_ptr<PartitionReader> currReader_;
+  std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;
+};
+} // namespace client
+} // namespace celeborn

Reply via email to