This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dcf83f75 [GLUTEN-6887][VL] Daily Update Velox Version (2024_08_16) 
(#6872)
6dcf83f75 is described below

commit 6dcf83f7527fc4e98f4cdc70b19d36b1080bcb82
Author: Gluten Performance Bot 
<[email protected]>
AuthorDate: Fri Aug 16 19:15:51 2024 +0800

    [GLUTEN-6887][VL] Daily Update Velox Version (2024_08_16) (#6872)
---
 cpp/velox/memory/VeloxMemoryManager.cc             |  17 +-
 .../serializer/VeloxColumnarBatchSerializer.cc     |   2 +-
 cpp/velox/shuffle/GlutenByteStream.h               | 267 +++++++++++++++++++++
 cpp/velox/shuffle/VeloxShuffleReader.cc            |   5 +-
 ep/build-velox/src/get_velox.sh                    |   2 +-
 5 files changed, 284 insertions(+), 9 deletions(-)

diff --git a/cpp/velox/memory/VeloxMemoryManager.cc 
b/cpp/velox/memory/VeloxMemoryManager.cc
index 0a57d6a99..6b5606dd2 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -63,11 +63,18 @@ class ListenableArbitrator : public 
velox::memory::MemoryArbitrator {
   ListenableArbitrator(const Config& config, AllocationListener* listener)
       : MemoryArbitrator(config),
         listener_(listener),
-        memoryPoolInitialCapacity_(
-            getConfig<uint64_t>(config.extraConfigs, 
kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)),
-        memoryPoolTransferCapacity_(
-            getConfig<uint64_t>(config.extraConfigs, 
kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) {
-  }
+        memoryPoolInitialCapacity_(velox::config::toCapacity(
+            getConfig<std::string>(
+                config.extraConfigs,
+                kMemoryPoolInitialCapacity,
+                std::to_string(kDefaultMemoryPoolInitialCapacity)),
+            velox::config::CapacityUnit::BYTE)),
+        memoryPoolTransferCapacity_(velox::config::toCapacity(
+            getConfig<std::string>(
+                config.extraConfigs,
+                kMemoryPoolTransferCapacity,
+                std::to_string(kDefaultMemoryPoolTransferCapacity)),
+            velox::config::CapacityUnit::BYTE)) {}
   std::string kind() const override {
     return kind_;
   }
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc 
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index 8b21e7bdb..acb14cf4d 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -34,7 +34,7 @@ namespace {
 std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
   std::vector<ByteRange> byteRanges;
   byteRanges.push_back(ByteRange{data, size, 0});
-  auto byteStream = std::make_unique<ByteInputStream>(byteRanges);
+  auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
   return byteStream;
 }
 } // namespace
diff --git a/cpp/velox/shuffle/GlutenByteStream.h 
b/cpp/velox/shuffle/GlutenByteStream.h
new file mode 100644
index 000000000..78ea7b905
--- /dev/null
+++ b/cpp/velox/shuffle/GlutenByteStream.h
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: wait to delete after rss sort reader refactored.
+#include "velox/common/memory/ByteStream.h"
+
+namespace facebook::velox {
+class GlutenByteInputStream : public ByteInputStream {
+ protected:
+  /// TODO Remove after refactoring SpillInput.
+  GlutenByteInputStream() {}
+
+ public:
+  explicit GlutenByteInputStream(std::vector<ByteRange> ranges) {
+    ranges_ = std::move(ranges);
+    VELOX_CHECK(!ranges_.empty());
+    current_ = &ranges_[0];
+  }
+
+  /// Disable copy constructor.
+  GlutenByteInputStream(const GlutenByteInputStream&) = delete;
+
+  /// Disable copy assignment operator.
+  GlutenByteInputStream& operator=(const GlutenByteInputStream& other) = 
delete;
+
+  /// Enable move constructor.
+  GlutenByteInputStream(GlutenByteInputStream&& other) noexcept = delete;
+
+  /// Enable move assignment operator.
+  GlutenByteInputStream& operator=(GlutenByteInputStream&& other) noexcept {
+    if (this != &other) {
+      ranges_ = std::move(other.ranges_);
+      current_ = other.current_;
+      other.current_ = nullptr;
+    }
+    return *this;
+  }
+
+  /// TODO Remove after refactoring SpillInput.
+  virtual ~GlutenByteInputStream() = default;
+
+  /// Returns total number of bytes available in the stream.
+  size_t size() const {
+    size_t total = 0;
+    for (const auto& range : ranges_) {
+      total += range.size;
+    }
+    return total;
+  }
+
+  /// Returns true if all input has been read.
+  ///
+  /// TODO: Remove 'virtual' after refactoring SpillInput.
+  virtual bool atEnd() const {
+    if (!current_) {
+      return false;
+    }
+    if (current_->position < current_->size) {
+      return false;
+    }
+
+    VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back());
+    return current_ == &ranges_.back();
+  }
+
+  /// Returns current position (number of bytes from the start) in the stream.
+  std::streampos tellp() const {
+    if (ranges_.empty()) {
+      return 0;
+    }
+    VELOX_DCHECK_NOT_NULL(current_);
+    int64_t size = 0;
+    for (auto& range : ranges_) {
+      if (&range == current_) {
+        return current_->position + size;
+      }
+      size += range.size;
+    }
+    VELOX_FAIL("GlutenByteInputStream 'current_' is not in 'ranges_'.");
+  }
+
+  /// Moves current position to specified one.
+  void seekp(std::streampos position) {
+    if (ranges_.empty() && position == 0) {
+      return;
+    }
+    int64_t toSkip = position;
+    for (auto& range : ranges_) {
+      if (toSkip <= range.size) {
+        current_ = &range;
+        current_->position = toSkip;
+        return;
+      }
+      toSkip -= range.size;
+    }
+    static_assert(sizeof(std::streamsize) <= sizeof(long long));
+    VELOX_FAIL("Seeking past end of GlutenByteInputStream: {}", 
static_cast<long long>(position));
+  }
+
+  /// Returns the remaining size left from current reading position.
+  size_t remainingSize() const {
+    if (ranges_.empty()) {
+      return 0;
+    }
+    const auto* lastRange = &ranges_[ranges_.size() - 1];
+    auto cur = current_;
+    size_t total = cur->size - cur->position;
+    while (++cur <= lastRange) {
+      total += cur->size;
+    }
+    return total;
+  }
+
+  std::string toString() const {
+    std::stringstream oss;
+    oss << ranges_.size() << " ranges (position/size) [";
+    for (const auto& range : ranges_) {
+      oss << "(" << range.position << "/" << range.size << (&range == current_ 
? " current" : "") << ")";
+      if (&range != &ranges_.back()) {
+        oss << ",";
+      }
+    }
+    oss << "]";
+    return oss.str();
+  }
+
+  uint8_t readByte() {
+    if (current_->position < current_->size) {
+      return current_->buffer[current_->position++];
+    }
+    next();
+    return readByte();
+  }
+
+  void readBytes(uint8_t* bytes, int32_t size) {
+    VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
+    int32_t offset = 0;
+    for (;;) {
+      int32_t available = current_->size - current_->position;
+      int32_t numUsed = std::min(available, size);
+      simd::memcpy(bytes + offset, current_->buffer + current_->position, 
numUsed);
+      offset += numUsed;
+      size -= numUsed;
+      current_->position += numUsed;
+      if (!size) {
+        return;
+      }
+      next();
+    }
+  }
+
+  template <typename T>
+  T read() {
+    if (current_->position + sizeof(T) <= current_->size) {
+      current_->position += sizeof(T);
+      return *reinterpret_cast<const T*>(current_->buffer + current_->position 
- sizeof(T));
+    }
+    // The number straddles two buffers. We read byte by byte and make
+    // a little-endian uint64_t. The bytes can be cast to any integer
+    // or floating point type since the wire format has the machine byte order.
+    static_assert(sizeof(T) <= sizeof(uint64_t));
+    uint64_t value = 0;
+    for (int32_t i = 0; i < sizeof(T); ++i) {
+      value |= static_cast<uint64_t>(readByte()) << (i * 8);
+    }
+    return *reinterpret_cast<const T*>(&value);
+  }
+
+  template <typename Char>
+  void readBytes(Char* data, int32_t size) {
+    readBytes(reinterpret_cast<uint8_t*>(data), size);
+  }
+
+  /// Returns a view over the read buffer for up to 'size' next
+  /// bytes. The size of the value may be less if the current byte
+  /// range ends within 'size' bytes from the current position.  The
+  /// size will be 0 if at end.
+  std::string_view nextView(int32_t size) {
+    VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes");
+    if (current_->position == current_->size) {
+      if (current_ == &ranges_.back()) {
+        return std::string_view(nullptr, 0);
+      }
+      next();
+    }
+    VELOX_CHECK(current_->size);
+    auto position = current_->position;
+    auto viewSize = std::min(current_->size - current_->position, size);
+    current_->position += viewSize;
+    return std::string_view(reinterpret_cast<char*>(current_->buffer) + 
position, viewSize);
+  }
+
+  void skip(int32_t size) {
+    VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes");
+    for (;;) {
+      int32_t available = current_->size - current_->position;
+      int32_t numUsed = std::min(available, size);
+      size -= numUsed;
+      current_->position += numUsed;
+      if (!size) {
+        return;
+      }
+      next();
+    }
+  }
+
+ protected:
+  /// Sets 'current_' to point to the next range of input.  // The
+  /// input is consecutive ByteRanges in 'ranges_' for the base class
+  /// but any view over external buffers can be made by specialization.
+  ///
+  /// TODO: Remove 'virtual' after refactoring SpillInput.
+  virtual void next(bool throwIfPastEnd = true) {
+    VELOX_CHECK(current_ >= &ranges_[0]);
+    size_t position = current_ - &ranges_[0];
+    VELOX_CHECK_LT(position, ranges_.size());
+    if (position == ranges_.size() - 1) {
+      if (throwIfPastEnd) {
+        VELOX_FAIL("Reading past end of GlutenByteInputStream");
+      }
+      return;
+    }
+    ++current_;
+    current_->position = 0;
+  }
+
+  // TODO: Remove  after refactoring SpillInput.
+  const std::vector<ByteRange>& ranges() const {
+    return ranges_;
+  }
+
+  // TODO: Remove  after refactoring SpillInput.
+  void setRange(ByteRange range) {
+    ranges_.resize(1);
+    ranges_[0] = range;
+    current_ = ranges_.data();
+  }
+};
+
+template <>
+inline Timestamp GlutenByteInputStream::read<Timestamp>() {
+  Timestamp value;
+  readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
+  return value;
+}
+
+template <>
+inline int128_t GlutenByteInputStream::read<int128_t>() {
+  int128_t value;
+  readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
+  return value;
+}
+} // namespace facebook::velox
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc 
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 4d002499c..3966857b9 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -16,6 +16,7 @@
  */
 
 #include "VeloxShuffleReader.h"
+#include "GlutenByteStream.h"
 
 #include <arrow/array/array_binary.h>
 #include <arrow/io/buffered.h>
@@ -177,7 +178,7 @@ VectorPtr readFlatVector<TypeKind::VARBINARY>(
 std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
   std::vector<ByteRange> byteRanges;
   byteRanges.push_back(ByteRange{data, size, 0});
-  auto byteStream = std::make_unique<ByteInputStream>(byteRanges);
+  auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
   return byteStream;
 }
 
@@ -450,7 +451,7 @@ std::shared_ptr<ColumnarBatch> 
VeloxSortShuffleReaderDeserializer::deserializeTo
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
-class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public 
facebook::velox::ByteInputStream {
+class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public 
facebook::velox::GlutenByteInputStream {
  public:
   VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, 
facebook::velox::BufferPtr buffer);
 
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index b51233983..f857ceada 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
 set -exu
 
 VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_08_15
+VELOX_BRANCH=2024_08_16
 VELOX_HOME=""
 
 OS=`uname -s`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to