This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6dcf83f75 [GLUTEN-6887][VL] Daily Update Velox Version (2024_08_16)
(#6872)
6dcf83f75 is described below
commit 6dcf83f7527fc4e98f4cdc70b19d36b1080bcb82
Author: Gluten Performance Bot
<[email protected]>
AuthorDate: Fri Aug 16 19:15:51 2024 +0800
[GLUTEN-6887][VL] Daily Update Velox Version (2024_08_16) (#6872)
---
cpp/velox/memory/VeloxMemoryManager.cc | 17 +-
.../serializer/VeloxColumnarBatchSerializer.cc | 2 +-
cpp/velox/shuffle/GlutenByteStream.h | 267 +++++++++++++++++++++
cpp/velox/shuffle/VeloxShuffleReader.cc | 5 +-
ep/build-velox/src/get_velox.sh | 2 +-
5 files changed, 284 insertions(+), 9 deletions(-)
diff --git a/cpp/velox/memory/VeloxMemoryManager.cc
b/cpp/velox/memory/VeloxMemoryManager.cc
index 0a57d6a99..6b5606dd2 100644
--- a/cpp/velox/memory/VeloxMemoryManager.cc
+++ b/cpp/velox/memory/VeloxMemoryManager.cc
@@ -63,11 +63,18 @@ class ListenableArbitrator : public
velox::memory::MemoryArbitrator {
ListenableArbitrator(const Config& config, AllocationListener* listener)
: MemoryArbitrator(config),
listener_(listener),
- memoryPoolInitialCapacity_(
- getConfig<uint64_t>(config.extraConfigs,
kMemoryPoolInitialCapacity, kDefaultMemoryPoolInitialCapacity)),
- memoryPoolTransferCapacity_(
- getConfig<uint64_t>(config.extraConfigs,
kMemoryPoolTransferCapacity, kDefaultMemoryPoolTransferCapacity)) {
- }
+ memoryPoolInitialCapacity_(velox::config::toCapacity(
+ getConfig<std::string>(
+ config.extraConfigs,
+ kMemoryPoolInitialCapacity,
+ std::to_string(kDefaultMemoryPoolInitialCapacity)),
+ velox::config::CapacityUnit::BYTE)),
+ memoryPoolTransferCapacity_(velox::config::toCapacity(
+ getConfig<std::string>(
+ config.extraConfigs,
+ kMemoryPoolTransferCapacity,
+ std::to_string(kDefaultMemoryPoolTransferCapacity)),
+ velox::config::CapacityUnit::BYTE)) {}
std::string kind() const override {
return kind_;
}
diff --git a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
index 8b21e7bdb..acb14cf4d 100644
--- a/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
+++ b/cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc
@@ -34,7 +34,7 @@ namespace {
std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
std::vector<ByteRange> byteRanges;
byteRanges.push_back(ByteRange{data, size, 0});
- auto byteStream = std::make_unique<ByteInputStream>(byteRanges);
+ auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
return byteStream;
}
} // namespace
diff --git a/cpp/velox/shuffle/GlutenByteStream.h
b/cpp/velox/shuffle/GlutenByteStream.h
new file mode 100644
index 000000000..78ea7b905
--- /dev/null
+++ b/cpp/velox/shuffle/GlutenByteStream.h
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: wait to delete after rss sort reader refactored.
+#include "velox/common/memory/ByteStream.h"
+
+namespace facebook::velox {
+class GlutenByteInputStream : public ByteInputStream {
+ protected:
+ /// TODO Remove after refactoring SpillInput.
+ GlutenByteInputStream() {}
+
+ public:
+ explicit GlutenByteInputStream(std::vector<ByteRange> ranges) {
+ ranges_ = std::move(ranges);
+ VELOX_CHECK(!ranges_.empty());
+ current_ = &ranges_[0];
+ }
+
+ /// Disable copy constructor.
+ GlutenByteInputStream(const GlutenByteInputStream&) = delete;
+
+ /// Disable copy assignment operator.
+ GlutenByteInputStream& operator=(const GlutenByteInputStream& other) =
delete;
+
+ /// Enable move constructor.
+ GlutenByteInputStream(GlutenByteInputStream&& other) noexcept = delete;
+
+ /// Enable move assignment operator.
+ GlutenByteInputStream& operator=(GlutenByteInputStream&& other) noexcept {
+ if (this != &other) {
+ ranges_ = std::move(other.ranges_);
+ current_ = other.current_;
+ other.current_ = nullptr;
+ }
+ return *this;
+ }
+
+ /// TODO Remove after refactoring SpillInput.
+ virtual ~GlutenByteInputStream() = default;
+
+ /// Returns total number of bytes available in the stream.
+ size_t size() const {
+ size_t total = 0;
+ for (const auto& range : ranges_) {
+ total += range.size;
+ }
+ return total;
+ }
+
+ /// Returns true if all input has been read.
+ ///
+ /// TODO: Remove 'virtual' after refactoring SpillInput.
+ virtual bool atEnd() const {
+ if (!current_) {
+ return false;
+ }
+ if (current_->position < current_->size) {
+ return false;
+ }
+
+ VELOX_CHECK(current_ >= ranges_.data() && current_ <= &ranges_.back());
+ return current_ == &ranges_.back();
+ }
+
+ /// Returns current position (number of bytes from the start) in the stream.
+ std::streampos tellp() const {
+ if (ranges_.empty()) {
+ return 0;
+ }
+ VELOX_DCHECK_NOT_NULL(current_);
+ int64_t size = 0;
+ for (auto& range : ranges_) {
+ if (&range == current_) {
+ return current_->position + size;
+ }
+ size += range.size;
+ }
+ VELOX_FAIL("GlutenByteInputStream 'current_' is not in 'ranges_'.");
+ }
+
+ /// Moves current position to specified one.
+ void seekp(std::streampos position) {
+ if (ranges_.empty() && position == 0) {
+ return;
+ }
+ int64_t toSkip = position;
+ for (auto& range : ranges_) {
+ if (toSkip <= range.size) {
+ current_ = ⦥
+ current_->position = toSkip;
+ return;
+ }
+ toSkip -= range.size;
+ }
+ static_assert(sizeof(std::streamsize) <= sizeof(long long));
+ VELOX_FAIL("Seeking past end of GlutenByteInputStream: {}",
static_cast<long long>(position));
+ }
+
+ /// Returns the remaining size left from current reading position.
+ size_t remainingSize() const {
+ if (ranges_.empty()) {
+ return 0;
+ }
+ const auto* lastRange = &ranges_[ranges_.size() - 1];
+ auto cur = current_;
+ size_t total = cur->size - cur->position;
+ while (++cur <= lastRange) {
+ total += cur->size;
+ }
+ return total;
+ }
+
+ std::string toString() const {
+ std::stringstream oss;
+ oss << ranges_.size() << " ranges (position/size) [";
+ for (const auto& range : ranges_) {
+ oss << "(" << range.position << "/" << range.size << (&range == current_
? " current" : "") << ")";
+ if (&range != &ranges_.back()) {
+ oss << ",";
+ }
+ }
+ oss << "]";
+ return oss.str();
+ }
+
+ uint8_t readByte() {
+ if (current_->position < current_->size) {
+ return current_->buffer[current_->position++];
+ }
+ next();
+ return readByte();
+ }
+
+ void readBytes(uint8_t* bytes, int32_t size) {
+ VELOX_CHECK_GE(size, 0, "Attempting to read negative number of bytes");
+ int32_t offset = 0;
+ for (;;) {
+ int32_t available = current_->size - current_->position;
+ int32_t numUsed = std::min(available, size);
+ simd::memcpy(bytes + offset, current_->buffer + current_->position,
numUsed);
+ offset += numUsed;
+ size -= numUsed;
+ current_->position += numUsed;
+ if (!size) {
+ return;
+ }
+ next();
+ }
+ }
+
+ template <typename T>
+ T read() {
+ if (current_->position + sizeof(T) <= current_->size) {
+ current_->position += sizeof(T);
+ return *reinterpret_cast<const T*>(current_->buffer + current_->position
- sizeof(T));
+ }
+ // The number straddles two buffers. We read byte by byte and make
+ // a little-endian uint64_t. The bytes can be cast to any integer
+ // or floating point type since the wire format has the machine byte order.
+ static_assert(sizeof(T) <= sizeof(uint64_t));
+ uint64_t value = 0;
+ for (int32_t i = 0; i < sizeof(T); ++i) {
+ value |= static_cast<uint64_t>(readByte()) << (i * 8);
+ }
+ return *reinterpret_cast<const T*>(&value);
+ }
+
+ template <typename Char>
+ void readBytes(Char* data, int32_t size) {
+ readBytes(reinterpret_cast<uint8_t*>(data), size);
+ }
+
+ /// Returns a view over the read buffer for up to 'size' next
+ /// bytes. The size of the value may be less if the current byte
+ /// range ends within 'size' bytes from the current position. The
+ /// size will be 0 if at end.
+ std::string_view nextView(int32_t size) {
+ VELOX_CHECK_GE(size, 0, "Attempting to view negative number of bytes");
+ if (current_->position == current_->size) {
+ if (current_ == &ranges_.back()) {
+ return std::string_view(nullptr, 0);
+ }
+ next();
+ }
+ VELOX_CHECK(current_->size);
+ auto position = current_->position;
+ auto viewSize = std::min(current_->size - current_->position, size);
+ current_->position += viewSize;
+ return std::string_view(reinterpret_cast<char*>(current_->buffer) +
position, viewSize);
+ }
+
+ void skip(int32_t size) {
+ VELOX_CHECK_GE(size, 0, "Attempting to skip negative number of bytes");
+ for (;;) {
+ int32_t available = current_->size - current_->position;
+ int32_t numUsed = std::min(available, size);
+ size -= numUsed;
+ current_->position += numUsed;
+ if (!size) {
+ return;
+ }
+ next();
+ }
+ }
+
+ protected:
+ /// Sets 'current_' to point to the next range of input. // The
+ /// input is consecutive ByteRanges in 'ranges_' for the base class
+ /// but any view over external buffers can be made by specialization.
+ ///
+ /// TODO: Remove 'virtual' after refactoring SpillInput.
+ virtual void next(bool throwIfPastEnd = true) {
+ VELOX_CHECK(current_ >= &ranges_[0]);
+ size_t position = current_ - &ranges_[0];
+ VELOX_CHECK_LT(position, ranges_.size());
+ if (position == ranges_.size() - 1) {
+ if (throwIfPastEnd) {
+ VELOX_FAIL("Reading past end of GlutenByteInputStream");
+ }
+ return;
+ }
+ ++current_;
+ current_->position = 0;
+ }
+
+ // TODO: Remove after refactoring SpillInput.
+ const std::vector<ByteRange>& ranges() const {
+ return ranges_;
+ }
+
+ // TODO: Remove after refactoring SpillInput.
+ void setRange(ByteRange range) {
+ ranges_.resize(1);
+ ranges_[0] = range;
+ current_ = ranges_.data();
+ }
+};
+
+template <>
+inline Timestamp GlutenByteInputStream::read<Timestamp>() {
+ Timestamp value;
+ readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
+ return value;
+}
+
+template <>
+inline int128_t GlutenByteInputStream::read<int128_t>() {
+ int128_t value;
+ readBytes(reinterpret_cast<uint8_t*>(&value), sizeof(value));
+ return value;
+}
+} // namespace facebook::velox
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc
b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 4d002499c..3966857b9 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -16,6 +16,7 @@
*/
#include "VeloxShuffleReader.h"
+#include "GlutenByteStream.h"
#include <arrow/array/array_binary.h>
#include <arrow/io/buffered.h>
@@ -177,7 +178,7 @@ VectorPtr readFlatVector<TypeKind::VARBINARY>(
std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
std::vector<ByteRange> byteRanges;
byteRanges.push_back(ByteRange{data, size, 0});
- auto byteStream = std::make_unique<ByteInputStream>(byteRanges);
+ auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
return byteStream;
}
@@ -450,7 +451,7 @@ std::shared_ptr<ColumnarBatch>
VeloxSortShuffleReaderDeserializer::deserializeTo
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
-class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public
facebook::velox::ByteInputStream {
+class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public
facebook::velox::GlutenByteInputStream {
public:
VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input,
facebook::velox::BufferPtr buffer);
diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh
index b51233983..f857ceada 100755
--- a/ep/build-velox/src/get_velox.sh
+++ b/ep/build-velox/src/get_velox.sh
@@ -17,7 +17,7 @@
set -exu
VELOX_REPO=https://github.com/oap-project/velox.git
-VELOX_BRANCH=2024_08_15
+VELOX_BRANCH=2024_08_16
VELOX_HOME=""
OS=`uname -s`
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]