bkietz commented on code in PR #35565:
URL: https://github.com/apache/arrow/pull/35565#discussion_r1202738729
##########
cpp/src/arrow/acero/exec_plan.h:
##########
@@ -555,6 +565,34 @@ struct ARROW_ACERO_EXPORT QueryOptions {
///
/// If set then the number of names must equal the number of output columns
std::vector<std::string> field_names;
+
+ /// Various compute functions and acero internals will type pun array
+ /// buffers from uint8_t* to some kind of value type (e.g. we might
+ /// cast to int32_t* to add two int32 arrays)
+ ///
+ /// If the buffer is poorly algined (e.g. an int32 array is not aligned
+ /// on a 4-byte boundary) then this is technically undefined behavior.
+ /// However, most modern compilers and CPUs are fairly tolerant of this
+ /// behavior and nothing bad (beyond a small hit to performance) is likely
+ /// to happen.
+ ///
+ /// Note that this only applies to source buffers. All buffers allocated
interally
+ /// by Acero will be suitably aligned.
+ ///
+ /// If this field is set to kWarn then Acero will check if any buffers are
unaligned
Review Comment:
Perhaps this section of the comment could be broken into comments for the
members of `UnalignedBufferHandling`?
##########
cpp/src/arrow/util/align_util_test.cc:
##########
@@ -278,4 +279,225 @@ TEST(EnsureAlignment, Table) {
ASSERT_EQ(util::CheckAlignment(*aligned_table, 2048, &needs_alignment),
true);
}
+using TypesRequiringSomeKindOfAlignment =
+ testing::Types<Int16Type, Int32Type, Int64Type, UInt16Type, UInt32Type,
UInt64Type,
+ FloatType, DoubleType, Date32Type, Date64Type, Time32Type,
Time64Type,
+ Decimal128Type, Decimal256Type, TimestampType,
DurationType, MapType,
+ DenseUnionType, LargeBinaryType, LargeListType,
LargeStringType,
+ MonthIntervalType, DayTimeIntervalType,
MonthDayNanoIntervalType>;
+
+using TypesNotRequiringAlignment =
+ testing::Types<NullType, Int8Type, UInt8Type, FixedSizeListType,
FixedSizeBinaryType,
+ BooleanType, SparseUnionType>;
+
+TEST(EnsureAlignment, Malloc) {}
+
+template <typename ArrowType>
+std::shared_ptr<DataType> sample_type() {
+ return TypeTraits<ArrowType>::type_singleton();
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeBinaryType>() {
+ return fixed_size_binary(16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<FixedSizeListType>() {
+ return fixed_size_list(uint8(), 16);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal128Type>() {
+ return decimal128(32, 6);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Decimal256Type>() {
+ return decimal256(60, 10);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<LargeListType>() {
+ return large_list(int8());
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DenseUnionType>() {
+ return dense_union({field("x", int8()), field("y", uint8())});
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<MapType>() {
+ return map(utf8(), field("item", utf8()));
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<DurationType>() {
+ return duration(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<TimestampType>() {
+ return timestamp(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time32Type>() {
+ return time32(TimeUnit::SECOND);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<Time64Type>() {
+ return time64(TimeUnit::NANO);
+}
+
+template <>
+std::shared_ptr<DataType> sample_type<SparseUnionType>() {
+ return sparse_union({field("x", uint8()), field("y", int8())}, {1, 2});
+}
+
+template <typename ArrowType>
+std::shared_ptr<ArrayData> SampleArray() {
+ random::RandomArrayGenerator gen(42);
+ return gen.ArrayOf(sample_type<ArrowType>(), 100)->data();
+}
+
+template <>
+std::shared_ptr<ArrayData> SampleArray<SparseUnionType>() {
+ auto ty = sparse_union({field("ints", int64()), field("strs", utf8())}, {2,
7});
+ auto ints = ArrayFromJSON(int64(), "[0, 1, 2, 3]");
+ auto strs = ArrayFromJSON(utf8(), R"(["a", null, "c", "d"])");
+ auto ids = ArrayFromJSON(int8(), "[2, 7, 2, 7]")->data()->buffers[1];
+ const int length = 4;
+ SparseUnionArray arr(ty, length, {ints, strs}, ids);
+ return arr.data();
+}
+
+class MallocAlignment : public ::testing::Test {
Review Comment:
```suggestion
class ValueAlignment : public ::testing::Test {
```
##########
cpp/src/arrow/acero/plan_test.cc:
##########
@@ -1704,5 +1704,43 @@ TEST(ExecPlanExecution,
SegmentedAggregationWithBatchCrossingSegment) {
{expected});
}
+TEST(ExecPlanExecution, UnalignedInput) {
+ std::shared_ptr<Array> array = ArrayFromJSON(int32(), "[1, 2, 3]");
+ std::shared_ptr<Array> unaligned = UnalignValues(*array);
+ ASSERT_OK_AND_ASSIGN(ExecBatch sample_batch,
+ ExecBatch::Make({unaligned}, array->length()));
+
+ BatchesWithSchema data;
+ data.batches = {std::move(sample_batch)};
+ data.schema = schema({field("i32", int32())});
+
+ Declaration plan = Declaration::Sequence({
+ {"exec_batch_source", ExecBatchSourceNodeOptions(data.schema,
data.batches)},
+ });
+
+ int64_t initial_bytes_allocated =
default_memory_pool()->total_bytes_allocated();
+
+ // By default we should warn and so the plan should finish ok
+ ASSERT_OK(DeclarationToStatus(plan));
+ ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
+
+ QueryOptions query_options;
+
+ // Nothing should happen if we ignore alignment
+ query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
+ ASSERT_OK(DeclarationToStatus(plan, query_options));
+ ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
Review Comment:
```suggestion
#ifndef ARROW_UBSAN
query_options.unaligned_buffer_handling = UnalignedBufferHandling::kIgnore;
ASSERT_OK(DeclarationToStatus(plan, query_options));
ASSERT_EQ(initial_bytes_allocated,
default_memory_pool()->total_bytes_allocated());
#endif
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]