This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 85fc004 ARROW-1572: [C++] Implement "value counts" kernels for
tabulating value frequencies
85fc004 is described below
commit 85fc004bd946272d4daa90f8f6b173c89cdb3162
Author: Micah Kornfield <[email protected]>
AuthorDate: Wed Mar 13 20:57:06 2019 -0500
ARROW-1572: [C++] Implement "value counts" kernels for tabulating value
frequencies
Picked up from: https://github.com/apache/arrow/pull/1970
I mostly reused the unit tests as is and modified the rest based on
feedback on that PR and adapting to new code. Some other changes I mae:
1. Remove the HashKernel and getter from the public API. I think we can
add these back once we have a better idea on how we are doing stateful kernels
(i.e. https://github.com/apache/arrow/pull/3407)
2. Add operator[] to NumericBuilder to modify previously added values (this
might not be the right place, and I had a little bit of trouble figuring out
how to integrate this into the existing TypedTest so the testing is a little
bit weak). This seemed better then using a vector to collect values.
If this approach looks OK for now. I'm also going to open up a JIRA to try
refactor the unittest for Hash kernels (and maybe the headers) since I think
there might be a clearer more granular way of laying these out.
other things to discuss:
1. Handling null values in Count/Unique (it looks like they are dropped
today, should this configurable/turned on).
2. Hashing edge cases for floating point numbers (just opened a JIRA on
this).
Author: Micah Kornfield <[email protected]>
Author: Alessandro Andrioni <[email protected]>
Closes #3579 from emkornfield/count_kernel and squashes the following
commits:
9c55f7ba6 <Micah Kornfield> make 64 bit
dd0d8a155 <Micah Kornfield> fix link and warning
72095ebc4 <Micah Kornfield> Templatize whether to use return status
54afb2bac <Micah Kornfield> change from std::copy to memcopy
2973fccbe <Micah Kornfield> Address code review comments
e7e624b1f <Micah Kornfield> Add constants, per code review
4770d9924 <Micah Kornfield> fix warning
c6f6ad72f <Micah Kornfield> address feedback
d99e52fb7 <Micah Kornfield> add guard to CopyValue in cases where vector is
empty
8c26b0154 <Micah Kornfield> fix format
b7d54929a <Micah Kornfield> add null test
f964bd6da <Micah Kornfield> Rebase
e8e58a5b9 <Micah Kornfield> Address output type code review feedback
defb4f1a1 <Micah Kornfield> remove export from .cc
0152f2fa5 <Micah Kornfield> plumb through status on hash visitors
afeb1ad04 <Micah Kornfield> add real jira
96858bd52 <Micah Kornfield> Use macro inversion to reduce boiler plate
0dd007718 <Micah Kornfield> minimal test
57349f7ea <Micah Kornfield> unit tests passing
34834f711 <Alessandro Andrioni> First try at implementing a CountValues
kernel
---
cpp/src/arrow/array-test.cc | 16 ++
cpp/src/arrow/array/builder_primitive.h | 6 +
cpp/src/arrow/compute/kernels/boolean-test.cc | 1 -
cpp/src/arrow/compute/kernels/hash-test.cc | 135 +++++++++++-
cpp/src/arrow/compute/kernels/hash.cc | 305 +++++++++++++++++++-------
cpp/src/arrow/compute/kernels/hash.h | 58 +++--
cpp/src/arrow/util/hashing.h | 5 +-
7 files changed, 410 insertions(+), 116 deletions(-)
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index f6f66d2..9451e00 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -494,6 +494,22 @@ void TestPrimitiveBuilder<PBoolean>::Check(const
std::unique_ptr<BooleanBuilder>
ASSERT_EQ(0, builder->null_count());
}
+TEST(NumericBuilderAccessors, TestSettersGetters) {
+ int64_t datum = 42;
+ int64_t new_datum = 43;
+ NumericBuilder<Int64Type> builder(int64(), default_memory_pool());
+
+ builder.Reset();
+ ASSERT_OK(builder.Append(datum));
+ ASSERT_EQ(builder.GetValue(0), datum);
+
+ // Now update the value.
+ builder[0] = new_datum;
+
+ ASSERT_EQ(builder.GetValue(0), new_datum);
+ ASSERT_EQ(((const NumericBuilder<Int64Type>&)builder)[0], new_datum);
+}
+
typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8,
PInt16,
PInt32, PInt64, PFloat, PDouble>
Primitives;
diff --git a/cpp/src/arrow/array/builder_primitive.h
b/cpp/src/arrow/array/builder_primitive.h
index 5a9b694..95cfaa7 100644
--- a/cpp/src/arrow/array/builder_primitive.h
+++ b/cpp/src/arrow/array/builder_primitive.h
@@ -92,6 +92,12 @@ class NumericBuilder : public ArrayBuilder {
return ArrayBuilder::Resize(capacity);
}
+ value_type operator[](int64_t index) const { return GetValue(index); }
+
+ value_type& operator[](int64_t index) {
+ return reinterpret_cast<value_type*>(data_builder_.mutable_data())[index];
+ }
+
/// \brief Append a sequence of elements in one shot
/// \param[in] values a contiguous C array of values
/// \param[in] length the number of values to append
diff --git a/cpp/src/arrow/compute/kernels/boolean-test.cc
b/cpp/src/arrow/compute/kernels/boolean-test.cc
index 439e0db..5e1da1b 100644
--- a/cpp/src/arrow/compute/kernels/boolean-test.cc
+++ b/cpp/src/arrow/compute/kernels/boolean-test.cc
@@ -129,7 +129,6 @@ TEST_F(TestBooleanKernel, Invert) {
}
TEST_F(TestBooleanKernel, InvertEmptyArray) {
- auto type = boolean();
std::vector<std::shared_ptr<Buffer>> data_buffers(2);
Datum input;
input.value = ArrayData::Make(boolean(), 0 /* length */,
std::move(data_buffers),
diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc
b/cpp/src/arrow/compute/kernels/hash-test.cc
index 84eec8b..d4ffa55 100644
--- a/cpp/src/arrow/compute/kernels/hash-test.cc
+++ b/cpp/src/arrow/compute/kernels/hash-test.cc
@@ -43,6 +43,8 @@
#include "arrow/compute/kernels/util-internal.h"
#include "arrow/compute/test-util.h"
+#include "arrow/ipc/json-simple.h"
+
using std::shared_ptr;
using std::vector;
@@ -61,10 +63,48 @@ void CheckUnique(FunctionContext* ctx, const
shared_ptr<DataType>& type,
shared_ptr<Array> result;
ASSERT_OK(Unique(ctx, input, &result));
+ // TODO: We probably shouldn't rely on array ordering.
ASSERT_ARRAYS_EQUAL(*expected, *result);
}
template <typename Type, typename T>
+void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr<DataType>&
type) {
+ std::vector<std::shared_ptr<Buffer>> data_buffers(2);
+ Datum input;
+ input.value =
+ ArrayData::Make(type, 0 /* length */, std::move(data_buffers), 0 /*
null_count */);
+
+ shared_ptr<Array> ex_values = ArrayFromJSON(type, "[]");
+ shared_ptr<Array> ex_counts = ArrayFromJSON(int64(), "[]");
+
+ shared_ptr<Array> result;
+ ASSERT_OK(ValueCounts(ctx, input, &result));
+ auto result_struct = std::dynamic_pointer_cast<StructArray>(result);
+ ASSERT_NE(result_struct->GetFieldByName(kValuesFieldName), nullptr);
+ // TODO: We probably shouldn't rely on value ordering.
+ ASSERT_ARRAYS_EQUAL(*ex_values,
*result_struct->GetFieldByName(kValuesFieldName));
+ ASSERT_ARRAYS_EQUAL(*ex_counts,
*result_struct->GetFieldByName(kCountsFieldName));
+}
+
+template <typename Type, typename T>
+void CheckValueCounts(FunctionContext* ctx, const shared_ptr<DataType>& type,
+ const vector<T>& in_values, const vector<bool>&
in_is_valid,
+ const vector<T>& out_values, const vector<bool>&
out_is_valid,
+ const vector<int64_t>& out_counts) {
+ shared_ptr<Array> input = _MakeArray<Type, T>(type, in_values, in_is_valid);
+ shared_ptr<Array> ex_values = _MakeArray<Type, T>(type, out_values,
out_is_valid);
+ shared_ptr<Array> ex_counts =
+ _MakeArray<Int64Type, int64_t>(int64(), out_counts, out_is_valid);
+
+ shared_ptr<Array> result;
+ ASSERT_OK(ValueCounts(ctx, input, &result));
+ auto result_struct = std::dynamic_pointer_cast<StructArray>(result);
+ // TODO: We probably shouldn't rely on value ordering.
+ ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->field(kValuesFieldIndex));
+ ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->field(kCountsFieldIndex));
+}
+
+template <typename Type, typename T>
void CheckDictEncode(FunctionContext* ctx, const shared_ptr<DataType>& type,
const vector<T>& in_values, const vector<bool>&
in_is_valid,
const vector<T>& out_values, const vector<bool>&
out_is_valid,
@@ -104,6 +144,16 @@ TYPED_TEST(TestHashKernelPrimitive, Unique) {
{3, 1}, {});
}
+TYPED_TEST(TestHashKernelPrimitive, ValueCounts) {
+ using T = typename TypeParam::c_type;
+ auto type = TypeTraits<TypeParam>::type_singleton();
+ CheckValueCounts<TypeParam, T>(&this->ctx_, type, {2, 1, 2, 1, 2, 3, 4},
+ {true, false, true, true, true, true, false},
{2, 1, 3},
+ {}, {3, 1, 1});
+ CheckValueCounts<TypeParam, T>(&this->ctx_, type, {}, {}, {}, {}, {});
+ CheckValueCountsNull<TypeParam, T>(&this->ctx_, type);
+}
+
TYPED_TEST(TestHashKernelPrimitive, DictEncode) {
using T = typename TypeParam::c_type;
auto type = TypeTraits<TypeParam>::type_singleton();
@@ -121,19 +171,21 @@ TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable)
{
vector<T> values;
vector<T> uniques;
vector<int32_t> indices;
+ vector<int64_t> counts;
for (int64_t i = 0; i < kTotalValues * kRepeats; i++) {
const auto val = static_cast<T>(i % kTotalValues);
values.push_back(val);
if (i < kTotalValues) {
uniques.push_back(val);
+ counts.push_back(kRepeats);
}
indices.push_back(static_cast<int32_t>(i % kTotalValues));
}
auto type = TypeTraits<TypeParam>::type_singleton();
CheckUnique<TypeParam, T>(&this->ctx_, type, values, {}, uniques, {});
-
+ CheckValueCounts<TypeParam, T>(&this->ctx_, type, values, {}, uniques, {},
counts);
CheckDictEncode<TypeParam, T>(&this->ctx_, type, values, {}, uniques, {},
indices);
}
@@ -149,6 +201,19 @@ TEST_F(TestHashKernel, UniqueTimeTimestamp) {
{});
}
+TEST_F(TestHashKernel, ValueCountsTimeTimestamp) {
+ CheckValueCounts<Time32Type, int32_t>(&this->ctx_, time32(TimeUnit::SECOND),
+ {2, 1, 2, 1}, {true, false, true,
true}, {2, 1},
+ {}, {2, 1});
+
+ CheckValueCounts<Time64Type, int64_t>(&this->ctx_, time64(TimeUnit::NANO),
{2, 1, 2, 1},
+ {true, false, true, true}, {2, 1}, {},
{2, 1});
+
+ CheckValueCounts<TimestampType, int64_t>(&this->ctx_,
timestamp(TimeUnit::NANO),
+ {2, 1, 2, 1}, {true, false, true,
true},
+ {2, 1}, {}, {2, 1});
+}
+
TEST_F(TestHashKernel, UniqueBoolean) {
CheckUnique<BooleanType, bool>(&this->ctx_, boolean(), {true, true, false,
true},
{true, false, true, true}, {true, false}, {});
@@ -164,6 +229,23 @@ TEST_F(TestHashKernel, UniqueBoolean) {
{false, true}, {});
}
+TEST_F(TestHashKernel, ValueCountsBoolean) {
+ CheckValueCounts<BooleanType, bool>(&this->ctx_, boolean(), {true, true,
false, true},
+ {true, false, true, true}, {true,
false}, {},
+ {2, 1});
+
+ CheckValueCounts<BooleanType, bool>(&this->ctx_, boolean(), {false, true,
false, true},
+ {true, false, true, true}, {false,
true}, {},
+ {2, 1});
+
+ // No nulls
+ CheckValueCounts<BooleanType, bool>(&this->ctx_, boolean(), {true, true,
false, true},
+ {}, {true, false}, {}, {3, 1});
+
+ CheckValueCounts<BooleanType, bool>(&this->ctx_, boolean(), {false, true,
false, true},
+ {}, {false, true}, {}, {2, 2});
+}
+
TEST_F(TestHashKernel, DictEncodeBoolean) {
CheckDictEncode<BooleanType, bool>(
&this->ctx_, boolean(), {true, true, false, true, false},
@@ -192,6 +274,16 @@ TEST_F(TestHashKernel, UniqueBinary) {
{true, false, true, true}, {"test",
"test2"}, {});
}
+TEST_F(TestHashKernel, ValueCountsBinary) {
+ CheckValueCounts<BinaryType, std::string>(
+ &this->ctx_, binary(), {"test", "", "test2", "test"}, {true, false,
true, true},
+ {"test", "test2"}, {}, {2, 1});
+
+ CheckValueCounts<StringType, std::string>(
+ &this->ctx_, utf8(), {"test", "", "test2", "test"}, {true, false, true,
true},
+ {"test", "test2"}, {}, {2, 1});
+}
+
TEST_F(TestHashKernel, DictEncodeBinary) {
CheckDictEncode<BinaryType, std::string>(
&this->ctx_, binary(), {"test", "", "test2", "test", "baz"},
@@ -214,6 +306,7 @@ TEST_F(TestHashKernel, BinaryResizeTable) {
vector<std::string> values;
vector<std::string> uniques;
vector<int32_t> indices;
+ vector<int64_t> counts;
char buf[20] = "test";
for (int32_t i = 0; i < kTotalValues * kRepeats; i++) {
@@ -224,15 +317,21 @@ TEST_F(TestHashKernel, BinaryResizeTable) {
if (i < kTotalValues) {
uniques.push_back(values.back());
+ counts.push_back(kRepeats);
}
indices.push_back(index);
}
CheckUnique<BinaryType, std::string>(&this->ctx_, binary(), values, {},
uniques, {});
+ CheckValueCounts<BinaryType, std::string>(&this->ctx_, binary(), values, {},
uniques,
+ {}, counts);
+
CheckDictEncode<BinaryType, std::string>(&this->ctx_, binary(), values, {},
uniques, {},
indices);
CheckUnique<StringType, std::string>(&this->ctx_, utf8(), values, {},
uniques, {});
+ CheckValueCounts<StringType, std::string>(&this->ctx_, utf8(), values, {},
uniques, {},
+ counts);
CheckDictEncode<StringType, std::string>(&this->ctx_, utf8(), values, {},
uniques, {},
indices);
}
@@ -291,6 +390,15 @@ TEST_F(TestHashKernel, UniqueDecimal) {
{true, false, true, true}, expected,
{});
}
+TEST_F(TestHashKernel, ValueCountsDecimal) {
+ vector<Decimal128> values{12, 12, 11, 12};
+ vector<Decimal128> expected{12, 11};
+
+ CheckValueCounts<Decimal128Type, Decimal128>(&this->ctx_, decimal(2, 0),
values,
+ {true, false, true, true},
expected, {},
+ {2, 1});
+}
+
TEST_F(TestHashKernel, DictEncodeDecimal) {
vector<Decimal128> values{12, 12, 11, 12, 13};
vector<Decimal128> expected{12, 11, 13};
@@ -300,6 +408,20 @@ TEST_F(TestHashKernel, DictEncodeDecimal) {
{}, {0, 0, 1, 0, 2});
}
+/* TODO(ARROW-4124): Determine if we wan to do something that is reproducable
with floats.
+TEST_F(TestHashKernel, ValueCountsFloat) {
+
+ // No nulls
+ CheckValueCounts<FloatType, float>(&this->ctx_, float32(), {1.0f, 0.0f,
-0.0f,
+std::nan("1"), std::nan("2") },
+ {}, {0.0f, 1.0f, std::nan("1")}, {}, {});
+
+ CheckValueCounts<DoubleType, double>(&this->ctx_, float64(), {1.0f, 0.0f,
-0.0f,
+std::nan("1"), std::nan("2") },
+ {}, {0.0f, 1.0f, std::nan("1")}, {}, {});
+}
+*/
+
TEST_F(TestHashKernel, ChunkedArrayInvoke) {
vector<std::string> values1 = {"foo", "bar", "foo"};
vector<std::string> values2 = {"bar", "baz", "quuux", "foo"};
@@ -311,6 +433,9 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) {
vector<std::string> dict_values = {"foo", "bar", "baz", "quuux"};
auto ex_dict = _MakeArray<StringType, std::string>(type, dict_values, {});
+ vector<int64_t> counts = {3, 2, 1, 1};
+ auto ex_counts = _MakeArray<Int64Type, int64_t>(int64(), counts, {});
+
ArrayVector arrays = {a1, a2};
auto carr = std::make_shared<ChunkedArray>(arrays);
@@ -329,6 +454,14 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) {
std::make_shared<DictionaryArray>(dict_type, i2)};
auto dict_carr = std::make_shared<ChunkedArray>(dict_arrays);
+ // Unique counts
+ shared_ptr<Array> counts_array;
+ ASSERT_OK(ValueCounts(&this->ctx_, carr, &counts_array));
+ auto counts_struct = std::dynamic_pointer_cast<StructArray>(counts_array);
+ ASSERT_ARRAYS_EQUAL(*ex_dict, *counts_struct->field(0));
+ ASSERT_ARRAYS_EQUAL(*ex_counts, *counts_struct->field(1));
+
+ // Dictionary encode
Datum encoded_out;
ASSERT_OK(DictionaryEncode(&this->ctx_, carr, &encoded_out));
ASSERT_EQ(Datum::CHUNKED_ARRAY, encoded_out.kind());
diff --git a/cpp/src/arrow/compute/kernels/hash.cc
b/cpp/src/arrow/compute/kernels/hash.cc
index f443282..2a3031f 100644
--- a/cpp/src/arrow/compute/kernels/hash.cc
+++ b/cpp/src/arrow/compute/kernels/hash.cc
@@ -88,15 +88,80 @@ class UniqueAction : public ActionBase {
void ObserveFound(Index index) {}
template <class Index>
+ void ObserveNotFound(Index index, Status* err_status) {
+ ARROW_LOG(FATAL) << "ObserveNotFound with err_status should not be called";
+ }
+
+ template <class Index>
void ObserveNotFound(Index index) {}
Status Flush(Datum* out) { return Status::OK(); }
std::shared_ptr<DataType> out_type() const { return type_; }
+
+ Status FlushFinal(Datum* out) { return Status::OK(); }
};
// ----------------------------------------------------------------------
-// Dictionary encode implementation
+// Count values implementation (see HashKernel for description of methods)
+
+class ValueCountsAction : ActionBase {
+ public:
+ using ActionBase::ActionBase;
+
+ ValueCountsAction(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+ : ActionBase(type, pool), count_builder_(pool) {}
+
+ Status Reserve(const int64_t length) {
+ // builder size is independent of input array size.
+ return Status::OK();
+ }
+
+ Status Reset() {
+ count_builder_.Reset();
+ return Status::OK();
+ }
+
+ // Don't do anything on flush because we don't want to finalize the builder
+ // or incur the cost of memory copies.
+ Status Flush(Datum* out) { return Status::OK(); }
+
+ std::shared_ptr<DataType> out_type() const { return type_; }
+
+ // Return the counts corresponding the MemoTable keys.
+ Status FlushFinal(Datum* out) {
+ std::shared_ptr<ArrayData> result;
+ RETURN_NOT_OK(count_builder_.FinishInternal(&result));
+ out->value = std::move(result);
+ return Status::OK();
+ }
+
+ void ObserveNull() {}
+
+ template <class Index>
+ void ObserveFound(Index slot) {
+ count_builder_[slot]++;
+ }
+
+ template <class Index>
+ void ObserveNotFound(Index slot) {
+ ARROW_LOG(FATAL) << "ObserveNotFound without err_status should not be
called";
+ }
+
+ template <class Index>
+ void ObserveNotFound(Index slot, Status* status) {
+ Status s = count_builder_.Append(1);
+ if (ARROW_PREDICT_FALSE(!s.ok())) {
+ *status = s;
+ }
+ }
+
+ private:
+ Int64Builder count_builder_;
+};
+
+// ----------------------------------------------------------------------
+// Dictionary encode implementation (see HashKernel for description of methods)
class DictEncodeAction : public ActionBase {
public:
@@ -119,7 +184,12 @@ class DictEncodeAction : public ActionBase {
template <class Index>
void ObserveNotFound(Index index) {
- return ObserveFound(index);
+ ObserveFound(index);
+ }
+
+ template <class Index>
+ void ObserveNotFound(Index index, Status* err_status) {
+ ARROW_LOG(FATAL) << "ObserveNotFound with err_status should not be called";
}
Status Flush(Datum* out) {
@@ -130,11 +200,33 @@ class DictEncodeAction : public ActionBase {
}
std::shared_ptr<DataType> out_type() const { return int32(); }
+ Status FlushFinal(Datum* out) { return Status::OK(); }
private:
Int32Builder indices_builder_;
};
+/// \brief Invoke hash table kernel on input array, returning any output
+/// values. Implementations should be thread-safe
+///
+/// This interface is implemented below using visitor pattern on "Action"
+/// implementations. It is not consolidate to keep the contract clearer.
+class HashKernel : public UnaryKernel {
+ public:
+ // Reset for another run.
+ virtual Status Reset() = 0;
+ // Prepare the Action for the given input (e.g. reserve appropriately sized
+ // data structures) and visit the given input with Action.
+ virtual Status Append(FunctionContext* ctx, const ArrayData& input) = 0;
+ // Flush out accumulated results from the last invocation of Call.
+ virtual Status Flush(Datum* out) = 0;
+ // Flush out accumulated results across all invocations of Call. The kernel
+ // should not be used until after Reset() is called.
+ virtual Status FlushFinal(Datum* out) = 0;
+ // Get the values (keys) acummulated in the dictionary so far.
+ virtual Status GetDictionary(std::shared_ptr<ArrayData>* out) = 0;
+};
+
// ----------------------------------------------------------------------
// Base class for all hash kernel implementations
@@ -161,7 +253,7 @@ class HashKernelImpl : public HashKernel {
// Base class for all "regular" hash kernel implementations
// (NullType has a separate implementation)
-template <typename Type, typename Scalar, typename Action>
+template <typename Type, typename Scalar, typename Action, bool
with_error_status = false>
class RegularHashKernelImpl : public HashKernelImpl {
public:
RegularHashKernelImpl(const std::shared_ptr<DataType>& type, MemoryPool*
pool)
@@ -179,6 +271,8 @@ class RegularHashKernelImpl : public HashKernelImpl {
Status Flush(Datum* out) override { return action_.Flush(out); }
+ Status FlushFinal(Datum* out) override { return action_.FlushFinal(out); }
+
Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
return DictionaryTraits<Type>::GetDictionaryArrayData(pool_, type_,
*memo_table_,
0 /* start_offset
*/, out);
@@ -186,15 +280,26 @@ class RegularHashKernelImpl : public HashKernelImpl {
Status VisitNull() {
action_.ObserveNull();
- return Status::OK();
+ return Status::Status::OK();
}
Status VisitValue(const Scalar& value) {
auto on_found = [this](int32_t memo_index) {
action_.ObserveFound(memo_index); };
- auto on_not_found = [this](int32_t memo_index) {
- action_.ObserveNotFound(memo_index);
- };
- memo_table_->GetOrInsert(value, on_found, on_not_found);
+
+ if (with_error_status) {
+ Status status;
+ auto on_not_found = [this, &status](int32_t memo_index) {
+ action_.ObserveNotFound(memo_index, &status);
+ };
+ memo_table_->GetOrInsert(value, on_found, on_not_found);
+ return status;
+ } else {
+ auto on_not_found = [this](int32_t memo_index) {
+ action_.ObserveNotFound(memo_index);
+ };
+
+ memo_table_->GetOrInsert(value, on_found, on_not_found);
+ }
return Status::OK();
}
@@ -229,6 +334,7 @@ class NullHashKernelImpl : public HashKernelImpl {
}
Status Flush(Datum* out) override { return action_.Flush(out); }
+ Status FlushFinal(Datum* out) override { return action_.FlushFinal(out); }
Status GetDictionary(std::shared_ptr<ArrayData>* out) override {
// TODO(wesm): handle null being a valid dictionary value
@@ -248,74 +354,80 @@ class NullHashKernelImpl : public HashKernelImpl {
// ----------------------------------------------------------------------
// Kernel wrapper for generic hash table kernels
-template <typename Type, typename Action, typename Enable = void>
+template <typename Type, typename Action, bool with_error_status, typename
Enable = void>
struct HashKernelTraits {};
-template <typename Type, typename Action>
-struct HashKernelTraits<Type, Action, enable_if_null<Type>> {
+template <typename Type, typename Action, bool with_error_status>
+struct HashKernelTraits<Type, Action, with_error_status, enable_if_null<Type>>
{
using HashKernelImpl = NullHashKernelImpl<Action>;
};
-template <typename Type, typename Action>
-struct HashKernelTraits<Type, Action, enable_if_has_c_type<Type>> {
- using HashKernelImpl = RegularHashKernelImpl<Type, typename Type::c_type,
Action>;
+template <typename Type, typename Action, bool with_error_status>
+struct HashKernelTraits<Type, Action, with_error_status,
enable_if_has_c_type<Type>> {
+ using HashKernelImpl =
+ RegularHashKernelImpl<Type, typename Type::c_type, Action,
with_error_status>;
};
-template <typename Type, typename Action>
-struct HashKernelTraits<Type, Action, enable_if_boolean<Type>> {
- using HashKernelImpl = RegularHashKernelImpl<Type, bool, Action>;
+template <typename Type, typename Action, bool with_error_status>
+struct HashKernelTraits<Type, Action, with_error_status,
enable_if_boolean<Type>> {
+ using HashKernelImpl = RegularHashKernelImpl<Type, bool, Action,
with_error_status>;
};
-template <typename Type, typename Action>
-struct HashKernelTraits<Type, Action, enable_if_binary<Type>> {
- using HashKernelImpl = RegularHashKernelImpl<Type, util::string_view,
Action>;
+template <typename Type, typename Action, bool with_error_status>
+struct HashKernelTraits<Type, Action, with_error_status,
enable_if_binary<Type>> {
+ using HashKernelImpl =
+ RegularHashKernelImpl<Type, util::string_view, Action,
with_error_status>;
};
-template <typename Type, typename Action>
-struct HashKernelTraits<Type, Action, enable_if_fixed_size_binary<Type>> {
- using HashKernelImpl = RegularHashKernelImpl<Type, util::string_view,
Action>;
+template <typename Type, typename Action, bool with_error_status>
+struct HashKernelTraits<Type, Action, with_error_status,
+ enable_if_fixed_size_binary<Type>> {
+ using HashKernelImpl =
+ RegularHashKernelImpl<Type, util::string_view, Action,
with_error_status>;
};
} // namespace
+#define PROCESS_SUPPORTED_HASH_TYPES(PROCESS) \
+ PROCESS(NullType) \
+ PROCESS(BooleanType) \
+ PROCESS(UInt8Type) \
+ PROCESS(Int8Type) \
+ PROCESS(UInt16Type) \
+ PROCESS(Int16Type) \
+ PROCESS(UInt32Type) \
+ PROCESS(Int32Type) \
+ PROCESS(UInt64Type) \
+ PROCESS(Int64Type) \
+ PROCESS(FloatType) \
+ PROCESS(DoubleType) \
+ PROCESS(Date32Type) \
+ PROCESS(Date64Type) \
+ PROCESS(Time32Type) \
+ PROCESS(Time64Type) \
+ PROCESS(TimestampType) \
+ PROCESS(BinaryType) \
+ PROCESS(StringType) \
+ PROCESS(FixedSizeBinaryType) \
+ PROCESS(Decimal128Type)
+
Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr<DataType>&
type,
std::unique_ptr<HashKernel>* out) {
std::unique_ptr<HashKernel> kernel;
-
-#define UNIQUE_CASE(InType)
\
- case InType::type_id:
\
- kernel.reset(new typename HashKernelTraits<InType,
UniqueAction>::HashKernelImpl( \
- type, ctx->memory_pool()));
\
- break
-
switch (type->id()) {
- UNIQUE_CASE(NullType);
- UNIQUE_CASE(BooleanType);
- UNIQUE_CASE(UInt8Type);
- UNIQUE_CASE(Int8Type);
- UNIQUE_CASE(UInt16Type);
- UNIQUE_CASE(Int16Type);
- UNIQUE_CASE(UInt32Type);
- UNIQUE_CASE(Int32Type);
- UNIQUE_CASE(UInt64Type);
- UNIQUE_CASE(Int64Type);
- UNIQUE_CASE(FloatType);
- UNIQUE_CASE(DoubleType);
- UNIQUE_CASE(Date32Type);
- UNIQUE_CASE(Date64Type);
- UNIQUE_CASE(Time32Type);
- UNIQUE_CASE(Time64Type);
- UNIQUE_CASE(TimestampType);
- UNIQUE_CASE(BinaryType);
- UNIQUE_CASE(StringType);
- UNIQUE_CASE(FixedSizeBinaryType);
- UNIQUE_CASE(Decimal128Type);
+#define PROCESS(InType)
\
+ case InType::type_id:
\
+ kernel.reset(new
\
+ typename HashKernelTraits<InType, UniqueAction,
false>::HashKernelImpl( \
+ type, ctx->memory_pool()));
\
+ break;
+
+ PROCESS_SUPPORTED_HASH_TYPES(PROCESS)
+#undef PROCESS
default:
break;
}
-#undef UNIQUE_CASE
-
CHECK_IMPLEMENTED(kernel, "unique", type);
RETURN_NOT_OK(kernel->Reset());
*out = std::move(kernel);
@@ -327,35 +439,16 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx,
std::unique_ptr<HashKernel>* out) {
std::unique_ptr<HashKernel> kernel;
-#define DICTIONARY_ENCODE_CASE(InType)
\
- case InType::type_id:
\
- kernel.reset(new
\
- typename HashKernelTraits<InType,
DictEncodeAction>::HashKernelImpl( \
- type, ctx->memory_pool()));
\
- break
-
switch (type->id()) {
- DICTIONARY_ENCODE_CASE(NullType);
- DICTIONARY_ENCODE_CASE(BooleanType);
- DICTIONARY_ENCODE_CASE(UInt8Type);
- DICTIONARY_ENCODE_CASE(Int8Type);
- DICTIONARY_ENCODE_CASE(UInt16Type);
- DICTIONARY_ENCODE_CASE(Int16Type);
- DICTIONARY_ENCODE_CASE(UInt32Type);
- DICTIONARY_ENCODE_CASE(Int32Type);
- DICTIONARY_ENCODE_CASE(UInt64Type);
- DICTIONARY_ENCODE_CASE(Int64Type);
- DICTIONARY_ENCODE_CASE(FloatType);
- DICTIONARY_ENCODE_CASE(DoubleType);
- DICTIONARY_ENCODE_CASE(Date32Type);
- DICTIONARY_ENCODE_CASE(Date64Type);
- DICTIONARY_ENCODE_CASE(Time32Type);
- DICTIONARY_ENCODE_CASE(Time64Type);
- DICTIONARY_ENCODE_CASE(TimestampType);
- DICTIONARY_ENCODE_CASE(BinaryType);
- DICTIONARY_ENCODE_CASE(StringType);
- DICTIONARY_ENCODE_CASE(FixedSizeBinaryType);
- DICTIONARY_ENCODE_CASE(Decimal128Type);
+#define PROCESS(InType)
\
+ case InType::type_id:
\
+ kernel.reset(
\
+ new typename HashKernelTraits<InType, DictEncodeAction,
false>::HashKernelImpl( \
+ type, ctx->memory_pool()));
\
+ break;
+
+ PROCESS_SUPPORTED_HASH_TYPES(PROCESS)
+#undef PROCESS
default:
break;
}
@@ -368,6 +461,30 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx,
return Status::OK();
}
+Status GetValueCountsKernel(FunctionContext* ctx, const
std::shared_ptr<DataType>& type,
+ std::unique_ptr<HashKernel>* out) {
+ std::unique_ptr<HashKernel> kernel;
+
+ switch (type->id()) {
+#define PROCESS(InType)
\
+ case InType::type_id:
\
+ kernel.reset(
\
+ new typename HashKernelTraits<InType, ValueCountsAction,
true>::HashKernelImpl( \
+ type, ctx->memory_pool()));
\
+ break;
+
+ PROCESS_SUPPORTED_HASH_TYPES(PROCESS)
+#undef PROCESS
+ default:
+ break;
+ }
+
+ CHECK_IMPLEMENTED(kernel, "count-values", type);
+ RETURN_NOT_OK(kernel->Reset());
+ *out = std::move(kernel);
+ return Status::OK();
+}
+
namespace {
Status InvokeHash(FunctionContext* ctx, HashKernel* func, const Datum& value,
@@ -415,5 +532,31 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum&
value, Datum* out) {
return Status::OK();
}
+const char kValuesFieldName[] = "values";
+const char kCountsFieldName[] = "counts";
+const int32_t kValuesFieldIndex = 0;
+const int32_t kCountsFieldIndex = 1;
+Status ValueCounts(FunctionContext* ctx, const Datum& value,
+ std::shared_ptr<Array>* counts) {
+ std::unique_ptr<HashKernel> func;
+ RETURN_NOT_OK(GetValueCountsKernel(ctx, value.type(), &func));
+
+ // Calls return nothing for counts.
+ std::vector<Datum> unused_output;
+ std::shared_ptr<Array> uniques;
+ RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &unused_output, &uniques));
+
+ Datum value_counts;
+ RETURN_NOT_OK(func->FlushFinal(&value_counts));
+
+ auto data_type =
std::make_shared<StructType>(std::vector<std::shared_ptr<Field>>{
+ std::make_shared<Field>(kValuesFieldName, uniques->type()),
+ std::make_shared<Field>(kCountsFieldName, int64())});
+ *counts = std::make_shared<StructArray>(
+ data_type, uniques->length(),
+ std::vector<std::shared_ptr<Array>>{uniques,
MakeArray(value_counts.array())});
+ return Status::OK();
+}
+#undef PROCESS_SUPPORTED_HASH_TYPES
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/kernels/hash.h
b/cpp/src/arrow/compute/kernels/hash.h
index 6bbe3cf..edc7c49 100644
--- a/cpp/src/arrow/compute/kernels/hash.h
+++ b/cpp/src/arrow/compute/kernels/hash.h
@@ -34,29 +34,10 @@ namespace compute {
class FunctionContext;
-/// \brief Invoke hash table kernel on input array, returning any output
-/// values. Implementations should be thread-safe
-class ARROW_EXPORT HashKernel : public UnaryKernel {
- public:
- // XXX why are those methods exposed?
- virtual Status Reset() = 0;
- virtual Status Append(FunctionContext* ctx, const ArrayData& input) = 0;
- virtual Status Flush(Datum* out) = 0;
- virtual Status GetDictionary(std::shared_ptr<ArrayData>* out) = 0;
-};
-
-/// \since 0.8.0
-/// \note API not yet finalized
-ARROW_EXPORT
-Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr<DataType>&
type,
- std::unique_ptr<HashKernel>* kernel);
-
-ARROW_EXPORT
-Status GetDictionaryEncodeKernel(FunctionContext* ctx,
- const std::shared_ptr<DataType>& type,
- std::unique_ptr<HashKernel>* kernel);
-
/// \brief Compute unique elements from an array-like object
+///
+/// Note if a null occurs in the input it will NOT be included in the output.
+///
/// \param[in] context the FunctionContext
/// \param[in] datum array-like input
/// \param[out] out result as Array
@@ -66,6 +47,29 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx,
ARROW_EXPORT
Status Unique(FunctionContext* context, const Datum& datum,
std::shared_ptr<Array>* out);
+// Constants for accessing the output of ValueCounts
+ARROW_EXPORT extern const char kValuesFieldName[];
+ARROW_EXPORT extern const char kCountsFieldName[];
+ARROW_EXPORT extern const int32_t kValuesFieldIndex;
+ARROW_EXPORT extern const int32_t kCountsFieldIndex;
+/// \brief Return counts of unique elements from an array-like object.
+///
+/// Note that the counts do not include counts for nulls in the array. These
can be
+/// obtained separately from metadata.
+///
+/// For floating point arrays there is no attempt to normalize -0.0, 0.0 and
NaN values
+/// which can lead to unexpected results if the input Array has these values.
+///
+/// \param[in] context the FunctionContext
+/// \param[in] value array-like input
+/// \param[out] counts An array of <input type "Values", int64_t "Counts">
structs.
+///
+/// \since 0.13.0
+/// \note API not yet finalized
+ARROW_EXPORT
+Status ValueCounts(FunctionContext* context, const Datum& value,
+ std::shared_ptr<Array>* counts);
+
/// \brief Dictionary-encode values in an array-like object
/// \param[in] context the FunctionContext
/// \param[in] data array-like input
@@ -81,11 +85,6 @@ Status DictionaryEncode(FunctionContext* context, const
Datum& data, Datum* out)
// TODO(wesm): Define API for regularizing DictionaryArray objects with
// different dictionaries
-// class DictionaryEncoder {
-// public:
-// virtual Encode(const Datum& data, Datum* out) = 0;
-// };
-
//
// ARROW_EXPORT
// Status DictionaryEncode(FunctionContext* context, const Datum& data,
@@ -100,11 +99,6 @@ Status DictionaryEncode(FunctionContext* context, const
Datum& data, Datum* out)
// Status IsIn(FunctionContext* context, const Datum& values, const Datum&
member_set,
// Datum* out);
-// ARROW_EXPORT
-// Status CountValues(FunctionContext* context, const Datum& values,
-// std::shared_ptr<Array>* out_uniques,
-// std::shared_ptr<Array>* out_counts);
-
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h
index 3dde0be..044d4e9 100644
--- a/cpp/src/arrow/util/hashing.h
+++ b/cpp/src/arrow/util/hashing.h
@@ -473,7 +473,10 @@ class SmallScalarMemoTable {
// Copy values starting from index `start` into `out_data`
void CopyValues(int32_t start, Scalar* out_data) const {
- memcpy(out_data, &index_to_value_[start], size() - start);
+ DCHECK_GE(start, 0);
+ DCHECK_LE(static_cast<size_t>(start), index_to_value_.size());
+ int64_t offset = start * static_cast<int32_t>(sizeof(Scalar));
+ memcpy(out_data, index_to_value_.data() + offset, (size() - start) *
sizeof(Scalar));
}
void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); }