pitrou commented on a change in pull request #8894: URL: https://github.com/apache/arrow/pull/8894#discussion_r552790445
########## File path: cpp/src/arrow/compute/kernels/scalar_cast_internal.h ########## @@ -62,6 +63,13 @@ void AddSimpleCast(InputType in_ty, OutputType out_ty, CastFunction* func) { CastFunctor<OutType, InType>::Exec)); } +template <typename InType, typename OutType> +void AddSimpleArrayOnlyCast(InputType in_ty, OutputType out_ty, CastFunction* func) { Review comment: Add comment as for `AddSimpleCast` above? ########## File path: cpp/src/arrow/compute/cast.cc ########## @@ -118,8 +118,86 @@ class CastMetaFunction : public MetaFunction { } // namespace +const FunctionDoc project_doc{"Wrap Arrays into a StructArray", Review comment: Can you put this inside the anonymous namespace above? ########## File path: cpp/src/arrow/compute/kernels/scalar_cast_test.cc ########## @@ -1801,6 +1801,10 @@ TYPED_TEST(TestDictionaryCast, Basic) { // TODO: Should casting dictionary scalars work? Review comment: Is this still a TODO? ########## File path: cpp/src/arrow/compute/kernels/util_internal.h ########## @@ -18,8 +18,12 @@ #pragma once #include <cstdint> +#include <functional> Review comment: Is this one useful? ########## File path: cpp/src/arrow/dataset/expression.h ########## @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This API is EXPERIMENTAL. + +#pragma once + +#include <atomic> +#include <memory> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "arrow/compute/type_fwd.h" +#include "arrow/dataset/type_fwd.h" +#include "arrow/dataset/visibility.h" +#include "arrow/datum.h" +#include "arrow/type_fwd.h" +#include "arrow/util/variant.h" + +namespace arrow { +namespace dataset { + +/// An unbound expression which maps a single Datum to another Datum. +/// An expression is one of +/// - A literal Datum. +/// - A reference to a single (potentially nested) field of the input Datum. +/// - A call to a compute function, with arguments specified by other Expressions. +class ARROW_DS_EXPORT Expression { Review comment: By the way, perhaps tag these APIs experimental, so that we can change them without warning? ########## File path: cpp/src/arrow/compute/cast.cc ########## @@ -118,8 +118,86 @@ class CastMetaFunction : public MetaFunction { } // namespace +const FunctionDoc project_doc{"Wrap Arrays into a StructArray", Review comment: Also, nit but I don't understand why this is in `cast.{h,cc}`. I would expect to only find the cast functions here. A new `scalar_nested.{h,cc}` would seem a more logical place. ########## File path: cpp/src/arrow/compute/cast.cc ########## @@ -118,8 +118,86 @@ class CastMetaFunction : public MetaFunction { } // namespace +const FunctionDoc project_doc{"Wrap Arrays into a StructArray", Review comment: (I'm also curious why it's called "project". It sounds rather imprecise, though it may be the conventional term for this operation?) ########## File path: cpp/src/arrow/compute/kernels/util_internal.h ########## @@ -50,6 +54,12 @@ int GetBitWidth(const DataType& type); // rather than duplicating compiled code to do all these in each kernel. PrimitiveArg GetPrimitiveArg(const ArrayData& arr); +// Augment a unary ArrayKernelExec which supports only array-like inputs with support for +// scalar inputs. Scalars will be transformed to 1-long arrays which are passed to the +// original exec. This could be far more efficient, but instead of optimizing this it'd be +// better to support scalar inputs "upstream" in original exec. Review comment: Add a word about null behaviour? ########## File path: cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc ########## @@ -256,14 +251,47 @@ struct CastFunctor<Date64Type, Date32Type> { template <> struct CastFunctor<Date32Type, Date64Type> { static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // TODO: Make this work on scalar inputs DCHECK_EQ(batch[0].kind(), Datum::ARRAY); ShiftTime<int64_t, int32_t>(ctx, util::DIVIDE, kMillisecondsInDay, *batch[0].array(), out->mutable_array()); } }; +// ---------------------------------------------------------------------- +// date32, date64 to timestamp + +template <> +struct CastFunctor<TimestampType, Date32Type> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + DCHECK_EQ(batch[0].kind(), Datum::ARRAY); + + const auto& out_type = checked_cast<const TimestampType&>(*out->type()); + // get conversion SECOND -> unit + auto conversion = util::GetTimestampConversion(TimeUnit::SECOND, out_type.unit()); + DCHECK_EQ(conversion.first, util::MULTIPLY); + + // multiply to achieve days -> unit + conversion.second *= kMillisecondsInDay / 1000; + ShiftTime<int32_t, int64_t>(ctx, util::MULTIPLY, conversion.second, *batch[0].array(), + out->mutable_array()); + } +}; + +template <> +struct CastFunctor<TimestampType, Date64Type> { + static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + DCHECK_EQ(batch[0].kind(), Datum::ARRAY); + + const auto& out_type = checked_cast<const TimestampType&>(*out->type()); + + // date64 is ms since epoch + auto conversion = util::GetTimestampConversion(TimeUnit::MILLI, out_type.unit()); + ShiftTime<int64_t, int64_t>(ctx, conversion.first, conversion.second, + *batch[0].array(), out->mutable_array()); Review comment: For the record, is there a fast path when `conversion.second == 1`? Otherwise, perhaps create a JIRA for it. ########## File path: cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc ########## @@ -282,7 +282,9 @@ struct ParseString { OutValue Call(KernelContext* ctx, Arg0Value val) const { OutValue result = OutValue(0); if (ARROW_PREDICT_FALSE(!ParseValue<OutType>(val.data(), val.size(), &result))) { - ctx->SetStatus(Status::Invalid("Failed to parse string: ", val)); + ctx->SetStatus(Status::Invalid("Failed to parse string: '", val, + "' as a scalar of type ", + TypeTraits<OutType>::type_singleton()->ToString())); Review comment: +1 ########## File path: cpp/src/arrow/compute/kernels/scalar_project_test.cc ########## @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <cstdio> +#include <functional> +#include <memory> +#include <string> +#include <vector> + +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/chunked_array.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" +#include "arrow/util/decimal.h" + +#include "arrow/compute/api_vector.h" +#include "arrow/compute/cast.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/test_util.h" + +namespace arrow { +namespace compute { + +struct { + public: + Result<Datum> operator()(std::vector<Datum> args) { + ProjectOptions opts{field_names}; + return CallFunction("project", args, &opts); + } + + std::vector<std::string> field_names; +} Project; + +TEST(Project, Scalar) { + std::shared_ptr<StructScalar> expected(new StructScalar{{}, struct_({})}); + ASSERT_OK_AND_EQ(Datum(expected), Project({})); + + auto i32 = MakeScalar(1); + auto f64 = MakeScalar(2.5); + auto str = MakeScalar("yo"); + + expected.reset(new StructScalar{ + {i32, f64, str}, + struct_({field("i", i32->type), field("f", f64->type), field("s", str->type)})}); + Project.field_names = {"i", "f", "s"}; + ASSERT_OK_AND_EQ(Datum(expected), Project({i32, f64, str})); + + // Three field names but one input value + ASSERT_RAISES(Invalid, Project({str})); +} + +TEST(Project, Array) { + Project.field_names = {"i", "s"}; + auto i32 = ArrayFromJSON(int32(), "[42, 13, 7]"); + auto str = ArrayFromJSON(utf8(), R"(["aa", "aa", "aa"])"); + ASSERT_OK_AND_ASSIGN(Datum expected, + StructArray::Make({i32, str}, Project.field_names)); + + ASSERT_OK_AND_EQ(expected, Project({i32, str})); + + // Scalars are broadcast to the length of the arrays + ASSERT_OK_AND_EQ(expected, Project({i32, MakeScalar("aa")})); + + // Array length mismatch + ASSERT_RAISES(Invalid, Project({i32->Slice(1), str})); +} + +TEST(Project, ChunkedArray) { + Project.field_names = {"i", "s"}; + + auto i32_0 = ArrayFromJSON(int32(), "[42, 13, 7]"); + auto i32_1 = ArrayFromJSON(int32(), "[]"); + auto i32_2 = ArrayFromJSON(int32(), "[32, 0]"); + + auto str_0 = ArrayFromJSON(utf8(), R"(["aa", "aa", "aa"])"); + auto str_1 = ArrayFromJSON(utf8(), "[]"); + auto str_2 = ArrayFromJSON(utf8(), R"(["aa", "aa"])"); + + ASSERT_OK_AND_ASSIGN(auto i32, ChunkedArray::Make({i32_0, i32_1, i32_2})); + ASSERT_OK_AND_ASSIGN(auto str, ChunkedArray::Make({str_0, str_1, str_2})); + + ASSERT_OK_AND_ASSIGN(auto expected_0, + StructArray::Make({i32_0, str_0}, Project.field_names)); + ASSERT_OK_AND_ASSIGN(auto expected_1, + StructArray::Make({i32_1, str_1}, Project.field_names)); + ASSERT_OK_AND_ASSIGN(auto expected_2, + StructArray::Make({i32_2, str_2}, Project.field_names)); + ASSERT_OK_AND_ASSIGN(Datum expected, + ChunkedArray::Make({expected_0, expected_1, expected_2})); + + ASSERT_OK_AND_EQ(expected, Project({i32, str})); Review comment: Will it also work if chunking is different (but with the same overall length)? ########## File path: cpp/src/arrow/dataset/partition.cc ########## @@ -573,5 +530,192 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema( return factory()->Inspect(paths); } +// Transform an array of counts to offsets which will divide a ListArray +// into an equal number of slices with corresponding lengths. +inline Result<std::shared_ptr<Array>> CountsToOffsets( + std::shared_ptr<Int64Array> counts) { + Int32Builder offset_builder; + RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1)); + offset_builder.UnsafeAppend(0); + + for (int64_t i = 0; i < counts->length(); ++i) { + DCHECK_NE(counts->Value(i), 0); + auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i)); + offset_builder.UnsafeAppend(next_offset); + } + + std::shared_ptr<Array> offsets; + RETURN_NOT_OK(offset_builder.Finish(&offsets)); + return offsets; +} + +// Helper for simultaneous dictionary encoding of multiple arrays. +// +// The fused dictionary is the Cartesian product of the individual dictionaries. +// For example given two arrays A, B where A has unique values ["ex", "why"] +// and B has unique values [0, 1] the fused dictionary is the set of tuples +// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]]. +// +// TODO(bkietz) this capability belongs in an Action of the hash kernels, where +// it can be used to group aggregates without materializing a grouped batch. +// For the purposes of writing we need the materialized grouped batch anyway +// since no Writers accept a selection vector. +class StructDictionary { + public: + struct Encoded { + std::shared_ptr<Int32Array> indices; + std::shared_ptr<StructDictionary> dictionary; + }; + + static Result<Encoded> Encode(const ArrayVector& columns) { + Encoded out{nullptr, std::make_shared<StructDictionary>()}; + + for (const auto& column : columns) { + if (column->null_count() != 0) { + return Status::NotImplemented("Grouping on a field with nulls"); + } + + RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices)); + } + + return out; + } + + Result<std::shared_ptr<StructArray>> Decode(std::shared_ptr<Int32Array> fused_indices, + FieldVector fields) { + std::vector<Int32Builder> builders(dictionaries_.size()); + for (Int32Builder& b : builders) { + RETURN_NOT_OK(b.Resize(fused_indices->length())); + } + + std::vector<int32_t> codes(dictionaries_.size()); + for (int64_t i = 0; i < fused_indices->length(); ++i) { + Expand(fused_indices->Value(i), codes.data()); + + auto builder_it = builders.begin(); + for (int32_t index : codes) { + builder_it++->UnsafeAppend(index); + } + } + + ArrayVector columns(dictionaries_.size()); + for (size_t i = 0; i < dictionaries_.size(); ++i) { + std::shared_ptr<ArrayData> indices; + RETURN_NOT_OK(builders[i].FinishInternal(&indices)); + + ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices)); + columns[i] = column.make_array(); + } + + return StructArray::Make(std::move(columns), std::move(fields)); + } + + private: + Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) { + ArrayData* encoded; + if (column.type()->id() != Type::DICTIONARY) { + ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column)); + } + encoded = column.mutable_array(); + + auto indices = + std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1])); + + dictionaries_.push_back(MakeArray(std::move(encoded->dictionary))); + auto dictionary_size = static_cast<int32_t>(dictionaries_.back()->length()); + + if (*fused_indices == nullptr) { + *fused_indices = std::move(indices); + size_ = dictionary_size; + return Status::OK(); + } + + // It's useful to think about the case where each of dictionaries_ has size 10. + // In this case the decimal digit in the ones place is the code in dictionaries_[0], + // the tens place corresponds to dictionaries_[1], etc. + // The incumbent indices must be shifted to the hundreds place so as not to collide. + ARROW_ASSIGN_OR_RAISE(Datum new_fused_indices, + compute::Multiply(indices, MakeScalar(size_))); + + ARROW_ASSIGN_OR_RAISE(new_fused_indices, + compute::Add(new_fused_indices, *fused_indices)); + + *fused_indices = checked_pointer_cast<Int32Array>(new_fused_indices.make_array()); + + // XXX should probably cap this at 2**15 or so + ARROW_CHECK(!internal::MultiplyWithOverflow(size_, dictionary_size, &size_)); Review comment: Ping. ########## File path: cpp/src/arrow/dataset/partition.h ########## @@ -294,5 +285,22 @@ class ARROW_DS_EXPORT PartitioningOrFactory { std::shared_ptr<Partitioning> partitioning_; }; +/// \brief Assemble lists of indices of identical rows. +/// +/// \param[in] by A StructArray whose columns will be used as grouping criteria. +/// \return A StructArray mapping unique rows (in field "values", represented as a +/// StructArray with the same fields as `by`) to lists of indices where +/// that row appears (in field "groupings"). Review comment: Perhaps make this more explicit in the docstring then? ########## File path: cpp/src/arrow/type.h ########## @@ -1517,6 +1517,12 @@ class ARROW_EXPORT FieldRef { std::string ToString() const; size_t hash() const; + struct Hash { + size_t operator()(const FieldRef& ref) const { return ref.hash(); } + }; + + explicit operator bool() const { return Equals(FieldPath{}); } Review comment: (or rename this `bool empty() const`?) ########## File path: cpp/src/arrow/type.h ########## @@ -1517,6 +1517,12 @@ class ARROW_EXPORT FieldRef { std::string ToString() const; size_t hash() const; + struct Hash { + size_t operator()(const FieldRef& ref) const { return ref.hash(); } + }; + + explicit operator bool() const { return Equals(FieldPath{}); } Review comment: I'm not sure I understand the semantics just by reading the code. Could you add a comment above? ########## File path: cpp/src/arrow/dataset/partition.cc ########## @@ -573,5 +530,192 @@ Result<std::shared_ptr<Schema>> PartitioningOrFactory::GetOrInferSchema( return factory()->Inspect(paths); } +// Transform an array of counts to offsets which will divide a ListArray +// into an equal number of slices with corresponding lengths. +inline Result<std::shared_ptr<Array>> CountsToOffsets( + std::shared_ptr<Int64Array> counts) { + Int32Builder offset_builder; + RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1)); + offset_builder.UnsafeAppend(0); + + for (int64_t i = 0; i < counts->length(); ++i) { + DCHECK_NE(counts->Value(i), 0); + auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i)); + offset_builder.UnsafeAppend(next_offset); + } + + std::shared_ptr<Array> offsets; + RETURN_NOT_OK(offset_builder.Finish(&offsets)); + return offsets; +} + +// Helper for simultaneous dictionary encoding of multiple arrays. +// +// The fused dictionary is the Cartesian product of the individual dictionaries. +// For example given two arrays A, B where A has unique values ["ex", "why"] +// and B has unique values [0, 1] the fused dictionary is the set of tuples +// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]]. +// +// TODO(bkietz) this capability belongs in an Action of the hash kernels, where +// it can be used to group aggregates without materializing a grouped batch. +// For the purposes of writing we need the materialized grouped batch anyway +// since no Writers accept a selection vector. +class StructDictionary { + public: + struct Encoded { + std::shared_ptr<Int32Array> indices; + std::shared_ptr<StructDictionary> dictionary; + }; + + static Result<Encoded> Encode(const ArrayVector& columns) { + Encoded out{nullptr, std::make_shared<StructDictionary>()}; + + for (const auto& column : columns) { + if (column->null_count() != 0) { + return Status::NotImplemented("Grouping on a field with nulls"); + } + + RETURN_NOT_OK(out.dictionary->AddOne(column, &out.indices)); + } + + return out; + } + + Result<std::shared_ptr<StructArray>> Decode(std::shared_ptr<Int32Array> fused_indices, + FieldVector fields) { + std::vector<Int32Builder> builders(dictionaries_.size()); + for (Int32Builder& b : builders) { + RETURN_NOT_OK(b.Resize(fused_indices->length())); + } + + std::vector<int32_t> codes(dictionaries_.size()); + for (int64_t i = 0; i < fused_indices->length(); ++i) { + Expand(fused_indices->Value(i), codes.data()); + + auto builder_it = builders.begin(); + for (int32_t index : codes) { + builder_it++->UnsafeAppend(index); + } + } + + ArrayVector columns(dictionaries_.size()); + for (size_t i = 0; i < dictionaries_.size(); ++i) { + std::shared_ptr<ArrayData> indices; + RETURN_NOT_OK(builders[i].FinishInternal(&indices)); + + ARROW_ASSIGN_OR_RAISE(Datum column, compute::Take(dictionaries_[i], indices)); + columns[i] = column.make_array(); + } + + return StructArray::Make(std::move(columns), std::move(fields)); + } + + private: + Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) { + ArrayData* encoded; + if (column.type()->id() != Type::DICTIONARY) { + ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column)); + } + encoded = column.mutable_array(); + + auto indices = + std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1])); Review comment: Similarly, `std::move` seems wrong if the column is already dictionary-encoded. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org