bkietz commented on a change in pull request #9768:
URL: https://github.com/apache/arrow/pull/9768#discussion_r616784836
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -182,10 +182,62 @@ struct TestGrouper {
ExpectConsume(*ExecBatch::Make(key_batch), expected);
}
+ void AssertEquivalentIds(const Datum& expected, const Datum& actual) {
Review comment:
Please include a comment describing what this does compared to
AssertDatumsEqual
##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -182,10 +182,62 @@ struct TestGrouper {
ExpectConsume(*ExecBatch::Make(key_batch), expected);
}
+ void AssertEquivalentIds(const Datum& expected, const Datum& actual) {
+ auto left = expected.make_array();
+ auto right = actual.make_array();
+ ASSERT_EQ(left->length(), right->length()) << "#ids unequal";
+ int64_t num_ids = left->length();
+ auto left_data = left->data();
+ auto right_data = right->data();
+ const uint32_t* left_ids =
+ reinterpret_cast<const uint32_t*>(left_data->buffers[1]->data());
+ const uint32_t* right_ids =
+ reinterpret_cast<const uint32_t*>(right_data->buffers[1]->data());
+ uint32_t max_left_id = 0;
+ uint32_t max_right_id = 0;
+ for (int64_t i = 0; i < num_ids; ++i) {
+ if (left_ids[i] > max_left_id) {
+ max_left_id = left_ids[i];
+ }
+ if (right_ids[i] > max_right_id) {
+ max_right_id = right_ids[i];
+ }
+ }
+ std::vector<bool> right_to_left_present;
Review comment:
std::vector can be sized and initialized on construction
```suggestion
std::vector<bool> right_to_left_present(max_right_id + 1, false);
```
##########
File path: cpp/src/arrow/CMakeLists.txt
##########
@@ -392,14 +392,45 @@ if(ARROW_COMPUTE)
compute/kernels/vector_hash.cc
compute/kernels/vector_nested.cc
compute/kernels/vector_selection.cc
- compute/kernels/vector_sort.cc)
+ compute/kernels/vector_sort.cc
+ engine/key_hash.cc
+ engine/key_map.cc
+ engine/key_compare.cc
+ engine/key_encode.cc
+ engine/groupby.cc
+ engine/util.cc)
if(ARROW_HAVE_RUNTIME_AVX2)
list(APPEND ARROW_SRCS compute/kernels/aggregate_basic_avx2.cc)
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc
PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(compute/kernels/aggregate_basic_avx2.cc
PROPERTIES
COMPILE_FLAGS ${ARROW_AVX2_FLAG})
+ list(APPEND ARROW_SRCS engine/key_hash_avx2.cc)
+ set_source_files_properties(engine/key_hash_avx2.cc PROPERTIES
+ SKIP_PRECOMPILE_HEADERS ON)
+ set_source_files_properties(engine/key_hash_avx2.cc PROPERTIES
+ COMPILE_FLAGS ${ARROW_AVX2_FLAG})
Review comment:
Let's extract this to a macro,
```cmake
macro(append_avx2_src SRC)
if(ARROW_HAVE_RUNTIME_AVX2)
list(APPEND ARROW_SRCS ${SRC})
set_source_files_properties(${SRC} PROPERTIES
SKIP_PRECOMPILE_HEADERS ON)
set_source_files_properties(${SRC} PROPERTIES
COMPILE_FLAGS ${ARROW_AVX2_FLAG})
endif()
endmacro()
```
##########
File path: cpp/src/arrow/engine/key_encode.h
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/engine/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+/// Converts between key representation as a collection of arrays for
+/// individual columns and another representation as a single array of rows
+/// combining data from all columns into one value.
+/// This conversion is reversible.
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+class KeyEncoder {
+ public:
+ struct KeyEncoderContext {
+ bool has_avx2() const { return instr == util::CPUInstructionSet::avx2; }
+ util::CPUInstructionSet instr;
+ util::TempVectorStack* stack;
+ };
+
+ /// Description of a storage format for rows produced by encoder.
+ struct KeyRowMetadata {
+ uint32_t get_num_varbinary_cols() const {
+ return cumulative_lengths_length / sizeof(uint32_t);
+ }
+ /// Is row a varying-length binary, using offsets array to find a
beginning of a row,
+ /// or is it a fixed-length binary.
+ bool is_fixed_length;
+ /// For a fixed-length binary row, common size of rows in bytes.
+ /// For a varying-length binary, size of all encoded fixed-length key
columns.
+ /// Encoded fixed-length key columns in that case prefix the information
+ /// about all varying-length key columns.
+ uint32_t fixed_length;
+ /// Size in bytes of optional cumulative lengths of varying-length key
columns,
+ /// used when the row is not fixed length.
+ /// Zero for fixed-length row.
+ /// This number is equal to the number of varying-length key columns
multiplied
+ /// by sizeof(uint32_t), which is the size of a single cumulative length.
+ uint32_t cumulative_lengths_length;
+ /// Fixed number of bytes per row that are used to encode null masks.
+ /// Null masks indicate for a single row which of its key columns are null.
+ /// Nth bit in the sequence of bytes assigned to a row represents null
+ /// information for Nth key column.
+ int null_masks_bytes_per_row;
+ };
+
+ class KeyRowArray {
+ public:
+ KeyRowArray();
+ Status Init(MemoryPool* pool, const KeyRowMetadata& metadata);
+ void Clean();
+ Status AppendEmpty(uint32_t num_rows_to_append, uint32_t
num_extra_bytes_to_append);
+ Status AppendSelectionFrom(const KeyRowArray& from, uint32_t
num_rows_to_append,
+ const uint16_t* source_row_ids);
+ const KeyRowMetadata& get_metadata() const { return metadata_; }
+ int64_t get_length() const { return num_rows_; }
+ const uint8_t* data(int i) const {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return buffers_[i];
+ }
+ uint8_t* mutable_data(int i) {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return mutable_buffers_[i];
+ }
+ const uint32_t* get_offsets() const {
Review comment:
For simple accessors, please name functions in snake_case and without
prefixes like `get_`
```suggestion
const uint32_t* offsets() const {
```
##########
File path: cpp/src/arrow/engine/util.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#if defined(__clang__) || defined(__GNUC__)
+#define BYTESWAP(x) __builtin_bswap64(x)
+#define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
+#elif defined(_MSC_VER)
+#include <intrin.h>
+#define BYTESWAP(x) _byteswap_uint64(x)
+#define ROTL(x, n) _rotl((x), (n))
+#endif
+
+namespace arrow {
+namespace util {
+
+enum class CPUInstructionSet {
+ scalar,
+ avx2, // All of: AVX2, BMI2
+ avx512 // In addition to avx2, all of: AVX512-F, AVX512-BW, AVX512-DQ,
AVX512-CD
+};
+
+/// Storage used to allocate temporary vectors of a batch size.
+/// Temporary vectors should resemble allocating temporary variables on the
stack
+/// but in the context of vectorized processing where we need to store a
vector of
+/// temporaries instead of a single value.
+class TempVectorStack {
+ template <typename>
+ friend class TempVectorHolder;
+
+ public:
+ Status Init(MemoryPool* pool, int64_t size) {
+ pool_ = pool;
+ num_vectors_ = 0;
+ top_ = 0;
+ buffer_size_ = size;
+ ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool_));
+ buffer_ = std::move(buffer);
+ return Status::OK();
+ }
+
+ private:
+ void alloc(uint32_t num_bytes, uint8_t*& data, int& id) {
+ int64_t old_top = top_;
+ top_ += num_bytes + padding;
+ // Stack overflow check
+ ARROW_DCHECK(top_ <= buffer_size_);
+ data = buffer_->mutable_data() + old_top;
+ id = num_vectors_++;
+ }
+ void release(int id, uint32_t num_bytes) {
+ ARROW_DCHECK(num_vectors_ == id + 1);
+ int64_t size = num_bytes + padding;
+ ARROW_DCHECK(top_ >= size);
+ top_ -= size;
+ --num_vectors_;
+ }
+ static constexpr int64_t padding = 64;
+ MemoryPool* pool_;
+ int num_vectors_;
+ int64_t top_;
+ std::unique_ptr<ResizableBuffer> buffer_;
+ int64_t buffer_size_;
+};
+
+template <typename T>
+class TempVectorHolder {
+ friend class TempVectorStack;
+
+ public:
+ ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); }
+ T* mutable_data() { return reinterpret_cast<T*>(data_); }
+ TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) {
+ stack_ = stack;
+ num_elements_ = num_elements;
+ stack_->alloc(num_elements * sizeof(T), data_, id_);
+ }
+
+ private:
+ TempVectorStack* stack_;
+ uint8_t* data_;
+ int id_;
+ uint32_t num_elements_;
+};
+
+class BitUtil {
+ public:
+ template <int bit_to_search = 1>
Review comment:
To avoid counterintuitive instantiation/linkage, please don't declare
templates in headers then instantiate them in source files. Templates should
either be publicly defined or confined to a single translation unit (declared
only in a source file).
##########
File path: cpp/src/arrow/engine/util.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#if defined(__clang__) || defined(__GNUC__)
+#define BYTESWAP(x) __builtin_bswap64(x)
+#define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
+#elif defined(_MSC_VER)
+#include <intrin.h>
+#define BYTESWAP(x) _byteswap_uint64(x)
+#define ROTL(x, n) _rotl((x), (n))
+#endif
+
+namespace arrow {
+namespace util {
+
+enum class CPUInstructionSet {
+ scalar,
+ avx2, // All of: AVX2, BMI2
+ avx512 // In addition to avx2, all of: AVX512-F, AVX512-BW, AVX512-DQ,
AVX512-CD
+};
+
+/// Storage used to allocate temporary vectors of a batch size.
+/// Temporary vectors should resemble allocating temporary variables on the
stack
+/// but in the context of vectorized processing where we need to store a
vector of
+/// temporaries instead of a single value.
+class TempVectorStack {
+ template <typename>
+ friend class TempVectorHolder;
+
+ public:
+ Status Init(MemoryPool* pool, int64_t size) {
+ pool_ = pool;
+ num_vectors_ = 0;
+ top_ = 0;
+ buffer_size_ = size;
+ ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool_));
+ buffer_ = std::move(buffer);
+ return Status::OK();
+ }
+
+ private:
+ void alloc(uint32_t num_bytes, uint8_t*& data, int& id) {
+ int64_t old_top = top_;
+ top_ += num_bytes + padding;
+ // Stack overflow check
+ ARROW_DCHECK(top_ <= buffer_size_);
+ data = buffer_->mutable_data() + old_top;
+ id = num_vectors_++;
+ }
+ void release(int id, uint32_t num_bytes) {
+ ARROW_DCHECK(num_vectors_ == id + 1);
+ int64_t size = num_bytes + padding;
+ ARROW_DCHECK(top_ >= size);
+ top_ -= size;
+ --num_vectors_;
+ }
+ static constexpr int64_t padding = 64;
+ MemoryPool* pool_;
+ int num_vectors_;
+ int64_t top_;
+ std::unique_ptr<ResizableBuffer> buffer_;
Review comment:
```suggestion
std::unique_ptr<Buffer> buffer_;
```
##########
File path: cpp/src/arrow/engine/util.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#if defined(__clang__) || defined(__GNUC__)
+#define BYTESWAP(x) __builtin_bswap64(x)
+#define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
+#elif defined(_MSC_VER)
+#include <intrin.h>
+#define BYTESWAP(x) _byteswap_uint64(x)
+#define ROTL(x, n) _rotl((x), (n))
+#endif
+
+namespace arrow {
+namespace util {
+
+enum class CPUInstructionSet {
+ scalar,
+ avx2, // All of: AVX2, BMI2
+ avx512 // In addition to avx2, all of: AVX512-F, AVX512-BW, AVX512-DQ,
AVX512-CD
+};
+
+/// Storage used to allocate temporary vectors of a batch size.
+/// Temporary vectors should resemble allocating temporary variables on the
stack
+/// but in the context of vectorized processing where we need to store a
vector of
+/// temporaries instead of a single value.
+class TempVectorStack {
+ template <typename>
+ friend class TempVectorHolder;
+
+ public:
+ Status Init(MemoryPool* pool, int64_t size) {
+ pool_ = pool;
+ num_vectors_ = 0;
+ top_ = 0;
+ buffer_size_ = size;
+ ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool_));
+ buffer_ = std::move(buffer);
+ return Status::OK();
+ }
+
+ private:
+ void alloc(uint32_t num_bytes, uint8_t*& data, int& id) {
+ int64_t old_top = top_;
+ top_ += num_bytes + padding;
+ // Stack overflow check
+ ARROW_DCHECK(top_ <= buffer_size_);
+ data = buffer_->mutable_data() + old_top;
+ id = num_vectors_++;
+ }
+ void release(int id, uint32_t num_bytes) {
+ ARROW_DCHECK(num_vectors_ == id + 1);
+ int64_t size = num_bytes + padding;
+ ARROW_DCHECK(top_ >= size);
+ top_ -= size;
+ --num_vectors_;
+ }
+ static constexpr int64_t padding = 64;
+ MemoryPool* pool_;
Review comment:
TempVectorStack doesn't seem to use this MemoryPool outside of init(),
could you remove it and add a comment describing the use of this class?
##########
File path: cpp/src/arrow/engine/key_encode.h
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/engine/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+/// Converts between key representation as a collection of arrays for
+/// individual columns and another representation as a single array of rows
+/// combining data from all columns into one value.
+/// This conversion is reversible.
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+class KeyEncoder {
+ public:
+ struct KeyEncoderContext {
+ bool has_avx2() const { return instr == util::CPUInstructionSet::avx2; }
+ util::CPUInstructionSet instr;
+ util::TempVectorStack* stack;
+ };
+
+ /// Description of a storage format for rows produced by encoder.
+ struct KeyRowMetadata {
+ uint32_t get_num_varbinary_cols() const {
+ return cumulative_lengths_length / sizeof(uint32_t);
+ }
+ /// Is row a varying-length binary, using offsets array to find a
beginning of a row,
+ /// or is it a fixed-length binary.
+ bool is_fixed_length;
+ /// For a fixed-length binary row, common size of rows in bytes.
+ /// For a varying-length binary, size of all encoded fixed-length key
columns.
+ /// Encoded fixed-length key columns in that case prefix the information
+ /// about all varying-length key columns.
+ uint32_t fixed_length;
+ /// Size in bytes of optional cumulative lengths of varying-length key
columns,
+ /// used when the row is not fixed length.
+ /// Zero for fixed-length row.
+ /// This number is equal to the number of varying-length key columns
multiplied
+ /// by sizeof(uint32_t), which is the size of a single cumulative length.
+ uint32_t cumulative_lengths_length;
+ /// Fixed number of bytes per row that are used to encode null masks.
+ /// Null masks indicate for a single row which of its key columns are null.
+ /// Nth bit in the sequence of bytes assigned to a row represents null
+ /// information for Nth key column.
+ int null_masks_bytes_per_row;
+ };
+
+ class KeyRowArray {
+ public:
+ KeyRowArray();
+ Status Init(MemoryPool* pool, const KeyRowMetadata& metadata);
+ void Clean();
+ Status AppendEmpty(uint32_t num_rows_to_append, uint32_t
num_extra_bytes_to_append);
+ Status AppendSelectionFrom(const KeyRowArray& from, uint32_t
num_rows_to_append,
+ const uint16_t* source_row_ids);
+ const KeyRowMetadata& get_metadata() const { return metadata_; }
+ int64_t get_length() const { return num_rows_; }
+ const uint8_t* data(int i) const {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return buffers_[i];
+ }
+ uint8_t* mutable_data(int i) {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return mutable_buffers_[i];
+ }
+ const uint32_t* get_offsets() const {
+ return reinterpret_cast<const uint32_t*>(data(1));
+ }
+ uint32_t* get_mutable_offsets() {
+ return reinterpret_cast<uint32_t*>(mutable_data(1));
+ }
+ const uint8_t* get_null_masks() const { return null_masks_->data(); }
+ uint8_t* get_null_masks() { return null_masks_->mutable_data(); }
+
+ bool has_any_nulls(const KeyEncoderContext* ctx) const;
+
+ private:
+ Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
+ Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
+
+ int64_t size_null_masks(int64_t num_rows);
+ int64_t size_offsets(int64_t num_rows);
+ int64_t size_rows_fixed_length(int64_t num_rows);
+ int64_t size_rows_varying_length(int64_t num_bytes);
+ void update_buffer_pointers();
+
+ static constexpr int64_t padding_for_vectors = 64;
+ MemoryPool* pool_;
+ KeyRowMetadata metadata_;
+ /// Buffers can only expand during lifetime and never shrink.
+ std::unique_ptr<ResizableBuffer> null_masks_;
+ std::unique_ptr<ResizableBuffer> offsets_;
+ std::unique_ptr<ResizableBuffer> rows_;
+ static constexpr int max_buffers_ = 3;
+ const uint8_t* buffers_[max_buffers_];
+ uint8_t* mutable_buffers_[max_buffers_];
+ int64_t num_rows_;
+ int64_t rows_capacity_;
+ int64_t bytes_capacity_;
+
+ // Mutable to allow lazy evaluation
+ mutable int64_t num_rows_for_has_any_nulls_;
+ mutable bool has_any_nulls_;
+ };
+
+ /// Description of a storage format of a single key column as needed
+ /// for the purpose of row encoding.
+ struct KeyColumnMetadata {
+ KeyColumnMetadata() {}
+ KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in)
+ : is_fixed_length(is_fixed_length_in), fixed_length(fixed_length_in) {}
+ /// Is column storing a varying-length binary, using offsets array
+ /// to find a beginning of a value, or is it a fixed-length binary.
+ bool is_fixed_length;
+ /// For a fixed-length binary column: number of bytes per value.
+ /// Zero has a special meaning, indicating a bit vector with one bit per
value.
+ /// For a varying-length binary column: number of bytes per offset.
+ uint32_t fixed_length;
+ };
+
+ /// A lightweight description of an array representing one of key columns.
+ class KeyColumnArray {
+ public:
+ KeyColumnArray() {}
+ /// Create as a mix of buffers according to the mask from two descriptions
+ /// (Nth bit is set to 0 if Nth buffer from the first input
+ /// should be used and is set to 1 otherwise).
+ /// Metadata is inherited from the first input.
+ KeyColumnArray(const KeyColumnMetadata& metadata, const KeyColumnArray&
left,
+ const KeyColumnArray& right, int buffer_id_to_replace);
+ /// Create for reading
+ KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+ const uint8_t* buffer0, const uint8_t* buffer1,
+ const uint8_t* buffer2);
+ /// Create for writing
+ KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t*
buffer0,
+ uint8_t* buffer1, uint8_t* buffer2);
+ /// Create as a window view of original description that is offset
+ /// by a given number of rows.
+ /// The number of rows used in offset must be divisible by 8
+ /// in order to not split bit vectors within a single byte.
+ KeyColumnArray(const KeyColumnArray& from, int64_t start, int64_t length);
+ uint8_t* mutable_data(int i) {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return mutable_buffers_[i];
+ }
+ const uint8_t* data(int i) const {
+ ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+ return buffers_[i];
+ }
+ uint32_t* get_mutable_offsets() {
+ return reinterpret_cast<uint32_t*>(mutable_data(1));
+ }
+ const uint32_t* get_offsets() const {
+ return reinterpret_cast<const uint32_t*>(data(1));
+ }
+ const KeyColumnMetadata& get_metadata() const { return metadata_; }
+ int64_t get_length() const { return length_; }
+
+ private:
+ static constexpr int max_buffers_ = 3;
+ const uint8_t* buffers_[max_buffers_];
+ uint8_t* mutable_buffers_[max_buffers_];
+ KeyColumnMetadata metadata_;
+ int64_t length_;
+ };
+
+ void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext*
ctx);
+
+ const KeyRowMetadata& get_row_metadata() { return row_metadata_; }
+
+ /// Find out the required sizes of all buffers output buffers for encoding
+ /// (including varying-length buffers).
+ /// Use that information to resize provided row array so that it can fit
+ /// encoded data.
+ Status PrepareOutputForEncode(int64_t start_input_row, int64_t
num_input_rows,
+ KeyRowArray& rows,
+ const std::vector<KeyColumnArray>& all_cols);
+
+ /// Encode a window of column oriented data into the entire output
+ /// row oriented storage.
+ /// The output buffers for encoding need to be correctly sized before
+ /// starting encoding.
+ void Encode(int64_t start_input_row, int64_t num_input_rows, KeyRowArray&
rows,
+ const std::vector<KeyColumnArray>& cols);
+
+ /// Decode a window of row oriented data into a corresponding
+ /// window of column oriented storage.
+ /// The output buffers need to be correctly allocated and sized before
+ /// calling each method.
+ /// For that reason decoding is split into two functions.
+ /// The output of the first one, that processes everything except for
+ /// varying length buffers, can be used to find out required varying
+ /// length buffers sizes.
+ void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t
start_row_output,
+ int64_t num_rows, const KeyRowArray& rows,
+ std::vector<KeyColumnArray>& cols);
+
+ void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t
start_row_output,
+ int64_t num_rows, const KeyRowArray& rows,
+ std::vector<KeyColumnArray>& cols);
+
+ private:
+ void PrepareMetadata(const std::vector<KeyColumnMetadata>& col_metadata,
+ KeyRowMetadata* out_row_metadata);
+
+ /// Prepare column array vectors.
+ /// Output column arrays represent a range of input column arrays
+ /// specified by starting row and number of rows.
+ /// Three vectors are generated:
+ /// - all columns
+ /// - fixed-length columns only
+ /// - varying-length columns only
+ void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+ const std::vector<KeyColumnArray>& cols_in,
+ std::vector<KeyColumnArray>* out_all_cols,
+ std::vector<KeyColumnArray>*
out_fixedbinary_cols,
+ std::vector<KeyColumnArray>* out_varbinary_cols,
+ std::vector<uint32_t>*
batch_varbinary_cols_base_offsets);
+
+ void GetOutputBufferSizeForEncode(int64_t start_row, int64_t num_rows,
+ const KeyRowMetadata& row_metadata,
+ const std::vector<KeyColumnArray>&
all_cols,
+ int64_t* out_num_bytes_required);
+
+ class TransformBoolean {
+ public:
+ static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp);
+ static void PreEncode(const KeyColumnArray& input, KeyColumnArray& output,
+ KeyEncoderContext* ctx);
+ static void PostDecode(const KeyColumnArray& input, KeyColumnArray& output,
+ KeyEncoderContext* ctx);
+ };
+
+ class EncoderInteger {
+ public:
+ static void Encode(uint32_t* offset_within_row, KeyRowArray& rows,
+ const KeyColumnArray& col, KeyEncoderContext* ctx,
+ KeyColumnArray& temp);
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t*
offset_within_row,
+ const KeyRowArray& rows, KeyColumnArray& col,
+ KeyEncoderContext* ctx, KeyColumnArray& temp);
+ static bool UsesTransform(const KeyColumnArray& column);
+ static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+ KeyColumnArray& temp);
+ static void PreEncode(const KeyColumnArray& input, KeyColumnArray& output,
+ KeyEncoderContext* ctx);
+ static void PostDecode(const KeyColumnArray& input, KeyColumnArray& output,
+ KeyEncoderContext* ctx);
+
+ private:
+ static bool IsBoolean(const KeyColumnMetadata& metadata);
+ };
+
+ class EncoderBinary {
+ public:
+ static void Encode(uint32_t* offset_within_row, KeyRowArray& rows,
+ const KeyColumnArray& col, KeyEncoderContext* ctx,
+ KeyColumnArray& temp);
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t*
offset_within_row,
+ const KeyRowArray& rows, KeyColumnArray& col,
+ KeyEncoderContext* ctx, KeyColumnArray& temp);
+ static bool IsInteger(const KeyColumnMetadata& metadata);
+
+ private:
+ template <bool is_row_fixed_length, bool is_encoding, class COPY_FN>
+ static inline void EncodeDecodeHelper(uint32_t start_row, uint32_t
num_rows,
+ uint32_t offset_within_row,
+ const KeyRowArray* rows_const,
+ KeyRowArray* rows_mutable_maybe_null,
+ const KeyColumnArray* col_const,
+ KeyColumnArray*
col_mutable_maybe_null,
+ COPY_FN copy_fn);
+ template <bool is_row_fixed_length>
+ static void EncodeImp(uint32_t offset_within_row, KeyRowArray& rows,
+ const KeyColumnArray& col);
+ template <bool is_row_fixed_length>
+ static void DecodeImp(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const KeyRowArray& rows,
+ KeyColumnArray& col);
+#if defined(ARROW_HAVE_AVX2)
+ template <bool is_row_fixed_length>
+ static void EncodeImp_avx2(uint32_t offset_within_row, KeyRowArray& rows,
+ const KeyColumnArray& col);
+ template <bool is_row_fixed_length>
+ static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const KeyRowArray&
rows,
+ KeyColumnArray& col);
+#endif
+ static void ColumnMemsetNulls(uint32_t offset_within_row, KeyRowArray&
rows,
+ const KeyColumnArray& col,
KeyEncoderContext* ctx,
+ KeyColumnArray& temp_vector_16bit, uint8_t
byte_value);
+ template <bool is_row_fixed_length, uint32_t col_width>
+ static void ColumnMemsetNullsImp(uint32_t offset_within_row, KeyRowArray&
rows,
+ const KeyColumnArray& col,
KeyEncoderContext* ctx,
+ KeyColumnArray& temp_vector_16bit,
+ uint8_t byte_value);
+ };
+
+ class EncoderBinaryPair {
+ public:
+ static bool CanProcessPair(const KeyColumnMetadata& col1,
+ const KeyColumnMetadata& col2) {
+ return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
+ }
+ static void Encode(uint32_t* offset_within_row, KeyRowArray& rows,
+ const KeyColumnArray& col1, const KeyColumnArray& col2,
+ KeyEncoderContext* ctx, KeyColumnArray& temp1,
+ KeyColumnArray& temp2);
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t*
offset_within_row,
+ const KeyRowArray& rows, KeyColumnArray& col1,
+ KeyColumnArray& col2, KeyEncoderContext* ctx,
+ KeyColumnArray& temp1, KeyColumnArray& temp2);
+
+ private:
+ template <bool is_row_fixed_length, typename col1_type, typename col2_type>
+ static void EncodeImp(uint32_t num_rows_to_skip, uint32_t
offset_within_row,
+ KeyRowArray& rows, const KeyColumnArray& col1,
+ const KeyColumnArray& col2);
+ template <bool is_row_fixed_length, typename col1_type, typename col2_type>
+ static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
+ uint32_t num_rows, uint32_t offset_within_row,
+ const KeyRowArray& rows, KeyColumnArray& col1,
+ KeyColumnArray& col2);
+#if defined(ARROW_HAVE_AVX2)
+ template <bool is_row_fixed_length, uint32_t col_width>
+ static uint32_t EncodeImp_avx2(uint32_t offset_within_row, KeyRowArray&
rows,
+ const KeyColumnArray& col1,
+ const KeyColumnArray& col2);
+ template <bool is_row_fixed_length, uint32_t col_width>
+ static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const
KeyRowArray& rows,
+ KeyColumnArray& col1, KeyColumnArray& col2);
+#endif
+ };
+
+ class EncoderOffsets {
+ public:
+ // In order not to repeat work twice,
+ // encoding combines in a single pass computing of:
+ // a) row offsets for varying-length rows
+ // b) within each new row, the cumulative length array
+ // of varying-length values within a row.
+ static void Encode(KeyRowArray& rows,
+ const std::vector<KeyColumnArray>& varbinary_cols,
+ KeyEncoderContext* ctx);
+ static void Decode(uint32_t start_row, uint32_t num_rows, const
KeyRowArray& rows,
+ std::vector<KeyColumnArray>& varbinary_cols,
+ std::vector<uint32_t>& varbinary_cols_base_offset,
+ KeyEncoderContext* ctx);
+
+ private:
+ static void EncodeImp(uint32_t num_rows_already_processed, KeyRowArray&
rows,
+ const std::vector<KeyColumnArray>& varbinary_cols);
+#if defined(ARROW_HAVE_AVX2)
+ static uint32_t EncodeImp_avx2(KeyRowArray& rows,
+ const std::vector<KeyColumnArray>&
varbinary_cols,
+ KeyColumnArray& temp_buffer_32B_per_col);
+#endif
+ };
+
+ class EncoderVarBinary {
+ public:
+ static void Encode(uint32_t varbinary_col_id, KeyRowArray& rows,
+ const KeyColumnArray& col, KeyEncoderContext* ctx);
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t
varbinary_col_id,
+ const KeyRowArray& rows, KeyColumnArray& col,
+ KeyEncoderContext* ctx);
+
+ private:
+ template <bool first_varbinary_col, bool is_encoding, class COPY_FN>
+ static inline void EncodeDecodeHelper(uint32_t start_row, uint32_t
num_rows,
+ uint32_t varbinary_col_id,
+ const KeyRowArray* rows_const,
+ KeyRowArray* rows_mutable_maybe_null,
+ const KeyColumnArray* col_const,
+ KeyColumnArray*
col_mutable_maybe_null,
+ COPY_FN copy_fn);
+ template <bool first_varbinary_col>
+ static void EncodeImp(uint32_t varbinary_col_id, KeyRowArray& rows,
+ const KeyColumnArray& col);
+ template <bool first_varbinary_col>
+ static void DecodeImp(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const KeyRowArray& rows,
+ KeyColumnArray& col);
+#if defined(ARROW_HAVE_AVX2)
+ template <bool first_varbinary_col>
+ static void EncodeImp_avx2(uint32_t varbinary_col_id, KeyRowArray& rows,
+ const KeyColumnArray& col);
+ template <bool first_varbinary_col>
+ static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const KeyRowArray&
rows,
+ KeyColumnArray& col);
+#endif
+ };
+
+ class EncoderNulls {
+ public:
+ static void Encode(KeyRowArray& rows, const std::vector<KeyColumnArray>&
cols,
Review comment:
Please avoid mutable references. For out arguments, please use a mutable
pointer
```suggestion
static void Encode(KeyRowArray* rows, const std::vector<KeyColumnArray>&
cols,
```
##########
File path: cpp/src/arrow/engine/util_avx2.cc
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <immintrin.h>
+
+#include "arrow/engine/util.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace util {
+
+#if defined(ARROW_HAVE_AVX2)
+
+template <int bit_to_search>
+void BitUtil::bits_to_indexes_avx2(const int num_bits, const uint8_t* bits,
+ int& num_indexes, uint16_t* indexes) {
+ // 64 bits at a time
+ constexpr int unroll = 64;
+
+ // The caller takes care of processing the remaining bits at the end outside
of the
+ // multiples of 64
+ ARROW_DCHECK(num_bits % unroll == 0);
+
+ uint8_t byte_indexes[64];
+ const uint64_t incr = 0x0808080808080808ULL;
+ const uint64_t mask = 0x0706050403020100ULL;
+ num_indexes = 0;
+ for (int i = 0; i < num_bits / unroll; ++i) {
+ uint64_t word = reinterpret_cast<const uint64_t*>(bits)[i];
+ if (bit_to_search == 0) {
+ word = ~word;
+ }
+ uint64_t base = 0;
+ int num_indexes_loop = 0;
+ while (word) {
+ uint64_t byte_indexes_next =
+ _pext_u64(mask, _pdep_u64(word, UINT64_C(0X0101010101010101)) *
0xff) + base;
+ *reinterpret_cast<uint64_t*>(byte_indexes + num_indexes_loop) =
byte_indexes_next;
+ base += incr;
+ num_indexes_loop += static_cast<int>(arrow::BitUtil::PopCount(word &
0xff));
+ word >>= 8;
+ }
+ // Unpack indexes to 16-bits and either add the base of i * 64 or shuffle
input
+ // indexes
+ for (int j = 0; j < (num_indexes_loop + 15) / 16; ++j) {
+ __m256i output = _mm256_cvtepi8_epi16(
+ _mm_loadu_si128(reinterpret_cast<const __m128i*>(byte_indexes) + j));
+ output = _mm256_add_epi16(output, _mm256_set1_epi16(i * 64));
+ _mm256_storeu_si256(((__m256i*)(indexes + num_indexes)) + j, output);
+ }
+ num_indexes += num_indexes_loop;
+ }
+}
+template void BitUtil::bits_to_indexes_avx2<0>(const int num_bits, const
uint8_t* bits,
+ int& num_indexes, uint16_t*
indexes);
+template void BitUtil::bits_to_indexes_avx2<1>(const int num_bits, const
uint8_t* bits,
+ int& num_indexes, uint16_t*
indexes);
+
+template <int bit_to_search>
+void BitUtil::bits_filter_indexes_avx2(const int num_bits, const uint8_t* bits,
+ const uint16_t* input_indexes, int&
num_indexes,
+ uint16_t* indexes) {
+ // 64 bits at a time
+ constexpr int unroll = 64;
+
+ // The caller takes care of processing the remaining bits at the end outside
of the
+ // multiples of 64
+ ARROW_DCHECK(num_bits % unroll == 0);
+
+ const uint64_t mask = 0xfedcba9876543210ULL;
+ num_indexes = 0;
+ for (int i = 0; i < num_bits / unroll; ++i) {
+ uint64_t word = reinterpret_cast<const uint64_t*>(bits)[i];
+ if (bit_to_search == 0) {
+ word = ~word;
+ }
+
+ int loop_id = 0;
+ while (word) {
+ uint64_t indexes_4bit =
+ _pext_u64(mask, _pdep_u64(word, UINT64_C(0x1111111111111111)) * 0xf);
+ // Unpack 4 bit indexes to 8 bits
+ __m256i indexes_8bit = _mm256_set1_epi64x(indexes_4bit);
+ indexes_8bit = _mm256_shuffle_epi8(
+ indexes_8bit, _mm256_setr_epi64x(0x0303020201010000ULL,
0x0707060605050404ULL,
+ 0x0303020201010000ULL,
0x0707060605050404ULL));
+ indexes_8bit = _mm256_blendv_epi8(
+ _mm256_and_si256(indexes_8bit, _mm256_set1_epi8(0x0f)),
+ _mm256_and_si256(_mm256_srli_epi32(indexes_8bit, 4),
_mm256_set1_epi8(0x0f)),
+ _mm256_set1_epi16(static_cast<uint16_t>(0xff00)));
+ __m256i input =
+ _mm256_loadu_si256(((const __m256i*)input_indexes) + 4 * i +
loop_id);
+ // Shuffle bytes to get low bytes in the first 128-bit lane and high
bytes in the
+ // second
+ input = _mm256_shuffle_epi8(
+ input, _mm256_setr_epi64x(0x0e0c0a0806040200ULL,
0x0f0d0b0907050301ULL,
+ 0x0e0c0a0806040200ULL,
0x0f0d0b0907050301ULL));
Review comment:
Please create `constexpr` variables for any magic numbers
##########
File path: cpp/src/arrow/engine/groupby.h
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/engine/key_compare.h"
+#include "arrow/engine/key_encode.h"
+#include "arrow/engine/key_hash.h"
+#include "arrow/engine/key_map.h"
+#include "arrow/engine/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+class GroupMap {
Review comment:
GroupMap seems to be a pure implementation detail of GrouperFastImpl. If
there isn't a reason to keep the two classes distinct, could we consolidate
them into GrouperFastImpl
##########
File path: cpp/src/arrow/engine/key_encode.h
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/engine/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+/// Converts between key representation as a collection of arrays for
+/// individual columns and another representation as a single array of rows
+/// combining data from all columns into one value.
+/// This conversion is reversible.
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+class KeyEncoder {
+ public:
+ struct KeyEncoderContext {
+ bool has_avx2() const { return instr == util::CPUInstructionSet::avx2; }
+ util::CPUInstructionSet instr;
+ util::TempVectorStack* stack;
+ };
+
+ /// Description of a storage format for rows produced by encoder.
+ struct KeyRowMetadata {
+ uint32_t get_num_varbinary_cols() const {
Review comment:
For member functions, please use CamelCase
```suggestion
uint32_t GetNumVarBinaryCols() const {
```
##########
File path: cpp/src/arrow/engine/util.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#if defined(__clang__) || defined(__GNUC__)
+#define BYTESWAP(x) __builtin_bswap64(x)
+#define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
+#elif defined(_MSC_VER)
+#include <intrin.h>
+#define BYTESWAP(x) _byteswap_uint64(x)
+#define ROTL(x, n) _rotl((x), (n))
+#endif
+
+namespace arrow {
+namespace util {
+
+enum class CPUInstructionSet {
+ scalar,
+ avx2, // All of: AVX2, BMI2
+ avx512 // In addition to avx2, all of: AVX512-F, AVX512-BW, AVX512-DQ,
AVX512-CD
+};
+
+/// Storage used to allocate temporary vectors of a batch size.
+/// Temporary vectors should resemble allocating temporary variables on the
stack
+/// but in the context of vectorized processing where we need to store a
vector of
+/// temporaries instead of a single value.
+class TempVectorStack {
+ template <typename>
+ friend class TempVectorHolder;
+
+ public:
+ Status Init(MemoryPool* pool, int64_t size) {
+ pool_ = pool;
+ num_vectors_ = 0;
+ top_ = 0;
+ buffer_size_ = size;
+ ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool_));
+ buffer_ = std::move(buffer);
+ return Status::OK();
+ }
+
+ private:
+ void alloc(uint32_t num_bytes, uint8_t*& data, int& id) {
+ int64_t old_top = top_;
+ top_ += num_bytes + padding;
+ // Stack overflow check
+ ARROW_DCHECK(top_ <= buffer_size_);
+ data = buffer_->mutable_data() + old_top;
+ id = num_vectors_++;
+ }
+ void release(int id, uint32_t num_bytes) {
+ ARROW_DCHECK(num_vectors_ == id + 1);
+ int64_t size = num_bytes + padding;
+ ARROW_DCHECK(top_ >= size);
+ top_ -= size;
+ --num_vectors_;
+ }
+ static constexpr int64_t padding = 64;
+ MemoryPool* pool_;
+ int num_vectors_;
+ int64_t top_;
+ std::unique_ptr<ResizableBuffer> buffer_;
+ int64_t buffer_size_;
+};
+
+template <typename T>
+class TempVectorHolder {
+ friend class TempVectorStack;
+
+ public:
+ ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); }
+ T* mutable_data() { return reinterpret_cast<T*>(data_); }
+ TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) {
+ stack_ = stack;
+ num_elements_ = num_elements;
+ stack_->alloc(num_elements * sizeof(T), data_, id_);
+ }
+
+ private:
+ TempVectorStack* stack_;
+ uint8_t* data_;
+ int id_;
+ uint32_t num_elements_;
+};
+
+class BitUtil {
+ public:
+ template <int bit_to_search = 1>
+ static void bits_to_indexes(CPUInstructionSet instruction_set, const int
num_bits,
+ const uint8_t* bits, int& num_indexes, uint16_t*
indexes);
+
+ template <int bit_to_search = 1>
+ static void bits_filter_indexes(CPUInstructionSet instruction_set, const int
num_bits,
+ const uint8_t* bits, const uint16_t*
input_indexes,
+ int& num_indexes, uint16_t* indexes);
+
+ // Input and output indexes may be pointing to the same data (in-place
filtering).
+ static void bits_split_indexes(CPUInstructionSet instruction_set, const int
num_bits,
+ const uint8_t* bits, int& num_indexes_bit0,
+ uint16_t* indexes_bit0, uint16_t*
indexes_bit1);
+
+ // Bit 1 is replaced with byte 0xFF.
+ static void bits_to_bytes(CPUInstructionSet instruction_set, const int
num_bits,
+ const uint8_t* bits, uint8_t* bytes);
Review comment:
These bit utilities are generally useful, please put them into a header
so others could take advantage of them. For example
cpp/src/arrow/util/bitmap_builders.h has BytesToBits, for which this should
become an overload.
##########
File path: cpp/src/arrow/engine/key_map.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+
+#include "arrow/engine/util.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace compute {
+
+//
+// 0 byte - 7 bucket | 1. byte - 6 bucket | ...
+// ---------------------------------------------------
+// | Empty bit* | Empty bit |
+// ---------------------------------------------------
+// | 7-bit hash | 7-bit hash |
+// ---------------------------------------------------
+// * Empty bucket has value 0x80. Non-empty bucket has highest bit set to 0.
+// ** The order of bytes is reversed - highest byte represents 0th bucket.
+// No other part of data structure uses this reversed order.
+//
+class SwissTable {
Review comment:
Please add a higher level doc comment describing this class and it's
utility. Specifically: describe how equality comparison and appending/storage
of new entries are deferred to callbacks.
Separately, add a comment detailing its implementation and usage of stamps
for vectorized probing.
##########
File path: cpp/src/arrow/engine/util.h
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+#if defined(__clang__) || defined(__GNUC__)
+#define BYTESWAP(x) __builtin_bswap64(x)
+#define ROTL(x, n) (((x) << (n)) | ((x) >> (32 - (n))))
+#elif defined(_MSC_VER)
+#include <intrin.h>
+#define BYTESWAP(x) _byteswap_uint64(x)
+#define ROTL(x, n) _rotl((x), (n))
+#endif
+
+namespace arrow {
+namespace util {
+
+enum class CPUInstructionSet {
+ scalar,
+ avx2, // All of: AVX2, BMI2
+ avx512 // In addition to avx2, all of: AVX512-F, AVX512-BW, AVX512-DQ,
AVX512-CD
+};
Review comment:
Instead of this, please use `arrow::internal::CpuInfo::{AVX2, AVX512}`
or `arrow::compute::SimdLevel::{NONE, AVX2, AVX512}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]