westonpace commented on code in PR #13880:
URL: https://github.com/apache/arrow/pull/13880#discussion_r954354305
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -17,11 +17,14 @@
#include <gmock/gmock-matchers.h>
+#include <chrono>
+#include <iostream>
Review Comment:
```suggestion
```
##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -135,15 +135,33 @@ class ARROW_EXPORT KeyColumnArray {
/// Only valid if this is a view into a varbinary type
uint32_t* mutable_offsets() {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
}
/// \brief Return a read-only version of the offsets buffer
///
/// Only valid if this is a view into a varbinary type
const uint32_t* offsets() const {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
}
+ /// \brief Return a mutable version of the large-offsets buffer
+ ///
+ /// Only valid if this is a view into a large varbinary type
+ uint64_t* mutable_large_offsets() {
+ DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint64_t));
Review Comment:
```suggestion
DCHECK_EQ(metadata_.fixed_length, sizeof(uint64_t));
```
##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -141,6 +141,12 @@ Result<KeyColumnArray> ColumnArrayFromArrayData(
const std::shared_ptr<ArrayData>& array_data, int64_t start_row, int64_t
num_rows) {
ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
ColumnMetadataFromDataType(array_data->type));
+ return ColumnArrayFromArrayDataAndMetadata(array_data, metadata, start_row,
num_rows);
+}
+
+KeyColumnArray ColumnArrayFromArrayDataAndMetadata(
+ const std::shared_ptr<ArrayData>& array_data, const KeyColumnMetadata&
metadata,
+ int64_t start_row, int64_t num_rows) {
Review Comment:
Why did you split up these methods? Is it to allow you to avoid a call to
`ColumnMetadataFromDataType` if you have the column metadata lying around?
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -184,27 +290,77 @@ class InputState {
return queue_.UnsyncFront();
}
- KeyType GetLatestKey() const {
- return queue_.UnsyncFront()
- ->column_data(key_col_index_)
- ->GetValues<KeyType>(1)[latest_ref_row_];
+#define LATEST_VAL_CASE(id, val) \
+ case Type::id: { \
+ using T = typename TypeIdTraits<Type::id>::Type; \
+ using CType = typename TypeTraits<T>::CType; \
+ return val(data->GetValues<CType>(1)[latest_ref_row_]); \
+ }
+
+ ByType GetLatestKey() const {
+ const RecordBatch* batch = queue_.UnsyncFront().get();
+ if (must_hash_) {
+ return key_hasher_->HashesFor(batch)[latest_ref_row_];
+ }
+ auto data = batch->column_data(key_col_index_[0]);
+ switch (key_type_id_[0]) {
+ LATEST_VAL_CASE(INT8, key_value)
+ LATEST_VAL_CASE(INT16, key_value)
+ LATEST_VAL_CASE(INT32, key_value)
+ LATEST_VAL_CASE(INT64, key_value)
+ LATEST_VAL_CASE(UINT8, key_value)
+ LATEST_VAL_CASE(UINT16, key_value)
+ LATEST_VAL_CASE(UINT32, key_value)
+ LATEST_VAL_CASE(UINT64, key_value)
+ LATEST_VAL_CASE(DATE32, key_value)
+ LATEST_VAL_CASE(DATE64, key_value)
+ LATEST_VAL_CASE(TIME32, key_value)
+ LATEST_VAL_CASE(TIME64, key_value)
+ LATEST_VAL_CASE(TIMESTAMP, key_value)
+ default:
+ DCHECK(false);
+ return 0; // cannot happen
+ }
}
- int64_t GetLatestTime() const {
- return queue_.UnsyncFront()
- ->column_data(time_col_index_)
- ->GetValues<int64_t>(1)[latest_ref_row_];
+ OnType GetLatestTime() const {
+ auto data = queue_.UnsyncFront()->column_data(time_col_index_);
+ switch (time_type_id_) {
Review Comment:
This switch statement seems identical to the one above it. Can you combine
them?
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -32,23 +35,172 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
#include "arrow/util/thread_pool.h"
+#define TRACED_TEST(t_class, t_name) \
+ static void _##t_class##_##t_name(); \
+ TEST(t_class, t_name) { \
+ ARROW_SCOPED_TRACE(#t_class "_" #t_name); \
+ _##t_class##_##t_name(); \
+ } \
+ static void _##t_class##_##t_name()
+
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
+bool is_temporal_primitive(Type::type type_id) {
+ switch (type_id) {
+ case Type::TIME32:
+ case Type::TIME64:
+ case Type::DATE32:
+ case Type::DATE64:
+ case Type::TIMESTAMP:
+ return true;
+ default:
+ return false;
+ }
+}
+
+BatchesWithSchema MakeBatchesFromNumString(
+ const std::shared_ptr<Schema>& schema,
+ const std::vector<util::string_view>& json_strings, int multiplicity = 1) {
+ FieldVector num_fields;
+ for (auto field : schema->fields()) {
+ num_fields.push_back(
+ is_base_binary_like(field->type()->id()) ? field->WithType(int64()) :
field);
+ }
+ auto num_schema =
+ std::make_shared<Schema>(num_fields, schema->endianness(),
schema->metadata());
+ BatchesWithSchema num_batches =
+ MakeBatchesFromString(num_schema, json_strings, multiplicity);
+ BatchesWithSchema batches;
+ batches.schema = schema;
+ int n_fields = schema->num_fields();
+ for (auto num_batch : num_batches.batches) {
+ std::vector<Datum> values;
+ for (int i = 0; i < n_fields; i++) {
+ auto type = schema->field(i)->type();
+ if (is_base_binary_like(type->id())) {
+ // casting to string first enables casting to binary
+ Datum as_string = Cast(num_batch.values[i], utf8()).ValueOrDie();
+ values.push_back(Cast(as_string, type).ValueOrDie());
+ } else {
+ values.push_back(num_batch.values[i]);
+ }
+ }
+ ExecBatch batch(values, num_batch.length);
+ batches.batches.push_back(batch);
+ }
+ return batches;
+}
+
+void BuildNullArray(std::shared_ptr<Array>& empty, const
std::shared_ptr<DataType>& type,
+ int64_t length) {
+ ASSERT_OK_AND_ASSIGN(auto builder, MakeBuilder(type, default_memory_pool()));
+ ASSERT_OK(builder->Reserve(length));
+ ASSERT_OK(builder->AppendNulls(length));
+ ASSERT_OK(builder->Finish(&empty));
+}
+
+void BuildZeroPrimitiveArray(std::shared_ptr<Array>& empty,
+ const std::shared_ptr<DataType>& type, int64_t
length) {
+ ASSERT_OK_AND_ASSIGN(auto builder, MakeBuilder(type, default_memory_pool()));
+ ASSERT_OK(builder->Reserve(length));
+ ASSERT_OK_AND_ASSIGN(auto scalar, MakeScalar(type, 0));
+ ASSERT_OK(builder->AppendScalar(*scalar, length));
+ ASSERT_OK(builder->Finish(&empty));
+}
+
+template <typename Builder>
+void BuildZeroBaseBinaryArray(std::shared_ptr<Array>& empty, int64_t length) {
+ Builder builder(default_memory_pool());
+ ASSERT_OK(builder.Reserve(length));
+ for (int64_t i = 0; i < length; i++) {
+ ASSERT_OK(builder.Append("0", /*length=*/1));
+ }
+ ASSERT_OK(builder.Finish(&empty));
+}
+
+// mutates by copying from_key into to_key and changing from_key to zero
+BatchesWithSchema MutateByKey(BatchesWithSchema& batches, std::string from_key,
+ std::string to_key, bool replace_key = false,
+ bool null_key = false) {
+ int from_index = batches.schema->GetFieldIndex(from_key);
+ int n_fields = batches.schema->num_fields();
+ auto fields = batches.schema->fields();
+ BatchesWithSchema new_batches;
+ auto new_field = batches.schema->field(from_index)->WithName(to_key);
+ new_batches.schema = (replace_key ? batches.schema->SetField(from_index,
new_field)
+ : batches.schema->AddField(from_index,
new_field))
+ .ValueOrDie();
Review Comment:
Let's avoid `ValueOrDie` here and return `Result<BatchesWithSchema>`
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -74,237 +226,723 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
/*same_chunk_layout=*/true, /*flatten=*/true);
}
-void DoRunBasicTest(const std::vector<util::string_view>& l_data,
- const std::vector<util::string_view>& r0_data,
- const std::vector<util::string_view>& r1_data,
- const std::vector<util::string_view>& exp_data, int64_t
tolerance) {
- auto l_schema =
- schema({field("time", int64()), field("key", int32()), field("l_v0",
float64())});
- auto r0_schema =
- schema({field("time", int64()), field("key", int32()), field("r0_v0",
float64())});
- auto r1_schema =
- schema({field("time", int64()), field("key", int32()), field("r1_v0",
float32())});
-
- auto exp_schema = schema({
- field("time", int64()),
- field("key", int32()),
- field("l_v0", float64()),
- field("r0_v0", float64()),
- field("r1_v0", float32()),
- });
-
- // Test three table join
- BatchesWithSchema l_batches, r0_batches, r1_batches, exp_batches;
- l_batches = MakeBatchesFromString(l_schema, l_data);
- r0_batches = MakeBatchesFromString(r0_schema, r0_data);
- r1_batches = MakeBatchesFromString(r1_schema, r1_data);
- exp_batches = MakeBatchesFromString(exp_schema, exp_data);
- CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time", "key",
- tolerance);
-}
-
-void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
- const std::shared_ptr<Schema>& r_schema) {
- BatchesWithSchema l_batches = MakeBatchesFromString(l_schema, {R"([])"});
- BatchesWithSchema r_batches = MakeBatchesFromString(r_schema, {R"([])"});
-
+#define CHECK_RUN_OUTPUT(by_key_type)
\
+ void CheckRunOutput(
\
+ const BatchesWithSchema& l_batches, const BatchesWithSchema& r0_batches,
\
+ const BatchesWithSchema& r1_batches, const BatchesWithSchema&
exp_batches, \
+ const FieldRef time, by_key_type keys, const int64_t tolerance) {
\
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches,
\
+ AsofJoinNodeOptions(time, keys, tolerance));
\
+ }
+
+EXPAND_BY_KEY_TYPE(CHECK_RUN_OUTPUT)
+
+void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str,
+ bool then_run_plan = false) {
Review Comment:
Minor nit but I maybe something like `fails_on_plan_create` would be a bit
more straightforward.
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -602,52 +828,155 @@ class AsofJoinNode : public ExecNode {
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string>
input_labels,
- const AsofJoinNodeOptions& join_options,
+ const vec_col_index_t& indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key, OnType
tolerance,
std::shared_ptr<Schema> output_schema);
+ Status InternalInit(bool must_hash, bool nullable_by_key,
+ std::vector<std::unique_ptr<KeyHasher>> key_hashers) {
+ key_hashers_.swap(key_hashers);
Review Comment:
Why not `key_hashers_ = std::move(key_hashers);`?
##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -397,24 +397,51 @@ class ARROW_EXPORT HashJoinNodeOptions : public
ExecNodeOptions {
/// This node will output one row for each row in the left table.
class ARROW_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
public:
- AsofJoinNodeOptions(FieldRef on_key, FieldRef by_key, int64_t tolerance)
- : on_key(std::move(on_key)), by_key(std::move(by_key)),
tolerance(tolerance) {}
+ AsofJoinNodeOptions(FieldRef on_key, const FieldRef& by_key, int64_t
tolerance,
+ bool nullable_by_key = false)
+ : on_key(std::move(on_key)),
+ by_key(),
+ tolerance(tolerance),
+ nullable_by_key(nullable_by_key) {
+ this->by_key.push_back(std::move(by_key));
+ }
- /// \brief "on" key for the join. Each
+ AsofJoinNodeOptions(FieldRef on_key, std::vector<FieldRef> by_key, int64_t
tolerance,
+ bool nullable_by_key = false)
+ : on_key(std::move(on_key)),
+ by_key(by_key),
Review Comment:
I didn't know this. I normally still do the move but it [appears you are
correct](https://godbolt.org/z/sx6dM77cW) about the copy elision. CC @bkietz
for second opinion just to make sure but I think this is fine.
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -17,33 +17,69 @@
#include <condition_variable>
#include <mutex>
-#include <set>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
+#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
namespace arrow {
namespace compute {
-// Remove this when multiple keys and/or types is supported
-typedef int32_t KeyType;
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+ return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+ return container.end() != std_find(container, val);
+}
+
+typedef uint64_t ByType;
+typedef uint64_t OnType;
+typedef uint64_t HashType;
// Maximum number of tables that can be joined
#define MAX_JOIN_TABLES 64
typedef uint64_t row_index_t;
typedef int col_index_t;
+typedef std::vector<col_index_t> vec_col_index_t;
Review Comment:
I feel this makes things harder for new readers just to save a few chars.
##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -135,15 +135,33 @@ class ARROW_EXPORT KeyColumnArray {
/// Only valid if this is a view into a varbinary type
uint32_t* mutable_offsets() {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
Review Comment:
```suggestion
DCHECK_EQ(metadata_.fixed_length, sizeof(uint32_t));
```
##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -135,15 +135,33 @@ class ARROW_EXPORT KeyColumnArray {
/// Only valid if this is a view into a varbinary type
uint32_t* mutable_offsets() {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
}
/// \brief Return a read-only version of the offsets buffer
///
/// Only valid if this is a view into a varbinary type
const uint32_t* offsets() const {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
}
+ /// \brief Return a mutable version of the large-offsets buffer
+ ///
+ /// Only valid if this is a view into a large varbinary type
+ uint64_t* mutable_large_offsets() {
+ DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint64_t));
+ return reinterpret_cast<uint64_t*>(mutable_data(kFixedLengthBuffer));
+ }
+ /// \brief Return a read-only version of the large-offsets buffer
+ ///
+ /// Only valid if this is a view into a large varbinary type
+ const uint64_t* large_offsets() const {
+ DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint64_t));
Review Comment:
```suggestion
DCHECK_EQ(metadata_.fixed_length, sizeof(uint64_t));
```
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -17,33 +17,69 @@
#include <condition_variable>
#include <mutex>
-#include <set>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
+#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
namespace arrow {
namespace compute {
-// Remove this when multiple keys and/or types is supported
-typedef int32_t KeyType;
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+ return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+ return container.end() != std_find(container, val);
+}
+
+typedef uint64_t ByType;
+typedef uint64_t OnType;
+typedef uint64_t HashType;
// Maximum number of tables that can be joined
#define MAX_JOIN_TABLES 64
typedef uint64_t row_index_t;
typedef int col_index_t;
+typedef std::vector<col_index_t> vec_col_index_t;
+
+// normalize the value to 64-bits while preserving ordering of values
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+static inline uint64_t norm_value(T t) {
+ uint64_t bias = std::is_signed<T>::value ? (uint64_t)1 << (8 * sizeof(T) -
1) : 0;
+ return t < 0 ? static_cast<uint64_t>(t + bias) : static_cast<uint64_t>(t);
+}
+
+// indicates normalization of a time value
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+static inline uint64_t time_value(T t) {
+ return norm_value(t);
+}
+
+// indicates normalization of a key value
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+static inline uint64_t key_value(T t) {
+ return norm_value(t);
+}
Review Comment:
Are you expecting the behavior here to diverge at some point? Why have two
methods that are simply wrapping `norm_value`?
##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -135,15 +135,33 @@ class ARROW_EXPORT KeyColumnArray {
/// Only valid if this is a view into a varbinary type
uint32_t* mutable_offsets() {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
}
/// \brief Return a read-only version of the offsets buffer
///
/// Only valid if this is a view into a varbinary type
const uint32_t* offsets() const {
DCHECK(!metadata_.is_fixed_length);
+ DCHECK(metadata_.fixed_length == sizeof(uint32_t));
Review Comment:
```suggestion
DCHECK_EQ(metadata_.fixed_length, sizeof(uint32_t));
```
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -602,52 +828,155 @@ class AsofJoinNode : public ExecNode {
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string>
input_labels,
- const AsofJoinNodeOptions& join_options,
+ const vec_col_index_t& indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key, OnType
tolerance,
std::shared_ptr<Schema> output_schema);
+ Status InternalInit(bool must_hash, bool nullable_by_key,
+ std::vector<std::unique_ptr<KeyHasher>> key_hashers) {
+ key_hashers_.swap(key_hashers);
+ auto inputs = this->inputs();
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ state_.push_back(::arrow::internal::make_unique<InputState>(
+ must_hash, nullable_by_key, key_hashers_[i].get(),
inputs[i]->output_schema(),
+ indices_of_on_key_[i], indices_of_by_key_[i]));
+ }
+
+ col_index_t dst_offset = 0;
+ for (auto& state : state_)
+ dst_offset = state->InitSrcToDstMapping(dst_offset, !!dst_offset);
+
+ return Status::OK();
+ }
+
virtual ~AsofJoinNode() {
process_.Push(false); // poison pill
process_thread_.join();
}
+ const vec_col_index_t& indices_of_on_key() { return indices_of_on_key_; }
+ const std::vector<vec_col_index_t>& indices_of_by_key() { return
indices_of_by_key_; }
+
static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
- const std::vector<ExecNode*>& inputs, const AsofJoinNodeOptions&
options) {
+ const std::vector<ExecNode*>& inputs, const vec_col_index_t&
indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key) {
std::vector<std::shared_ptr<arrow::Field>> fields;
- const auto& on_field_name = *options.on_key.name();
- const auto& by_field_name = *options.by_key.name();
-
+ size_t n_by = indices_of_by_key[0].size();
+ const DataType* on_key_type = NULLPTR;
+ std::vector<const DataType*> by_key_type(n_by, NULLPTR);
// Take all non-key, non-time RHS fields
for (size_t j = 0; j < inputs.size(); ++j) {
const auto& input_schema = inputs[j]->output_schema();
- const auto& on_field_ix = input_schema->GetFieldIndex(on_field_name);
- const auto& by_field_ix = input_schema->GetFieldIndex(by_field_name);
+ const auto& on_field_ix = indices_of_on_key[j];
+ const auto& by_field_ix = indices_of_by_key[j];
- if ((on_field_ix == -1) | (by_field_ix == -1)) {
+ if ((on_field_ix == -1) || std_has(by_field_ix, -1)) {
return Status::Invalid("Missing join key on table ", j);
}
+ const auto& on_field = input_schema->fields()[on_field_ix];
+ std::vector<const Field*> by_field(n_by);
+ for (size_t k = 0; k < n_by; k++) {
+ by_field[k] = input_schema->fields()[by_field_ix[k]].get();
+ }
+
+ if (on_key_type == NULLPTR) {
+ on_key_type = on_field->type().get();
+ } else if (*on_key_type != *on_field->type()) {
+ return Status::Invalid("Expected on-key type ", *on_key_type, " but
got ",
+ *on_field->type(), " for field ",
on_field->name(),
+ " in input ", j);
+ }
+ for (size_t k = 0; k < n_by; k++) {
+ if (by_key_type[k] == NULLPTR) {
+ by_key_type[k] = by_field[k]->type().get();
+ } else if (*by_key_type[k] != *by_field[k]->type()) {
+ return Status::Invalid("Expected on-key type ", *by_key_type[k], "
but got ",
+ *by_field[k]->type(), " for field ",
by_field[k]->name(),
+ " in input ", j);
+ }
+ }
+
for (int i = 0; i < input_schema->num_fields(); ++i) {
const auto field = input_schema->field(i);
- if (field->name() == on_field_name) {
- if (kSupportedOnTypes_.find(field->type()) ==
kSupportedOnTypes_.end()) {
- return Status::Invalid("Unsupported type for on key: ",
field->name());
+ if (i == on_field_ix) {
Review Comment:
Can the switch statements in this method be put into an `is_valid_on_field`,
etc. type helper method?
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -602,52 +828,155 @@ class AsofJoinNode : public ExecNode {
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string>
input_labels,
- const AsofJoinNodeOptions& join_options,
+ const vec_col_index_t& indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key, OnType
tolerance,
std::shared_ptr<Schema> output_schema);
+ Status InternalInit(bool must_hash, bool nullable_by_key,
Review Comment:
`InternalInit` is a bit of an odd name. It sort of looks like the `Make`
method was somewhat arbitrarily split in two. Did you intend to call this
somewhere else too?
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -657,34 +986,85 @@ class AsofJoinNode : public ExecNode {
return std::make_shared<arrow::Schema>(fields);
}
+ static inline Result<col_index_t> FindColIndex(const Schema& schema,
+ const FieldRef& field_ref,
+ const util::string_view&
key_kind) {
+ auto match_res = field_ref.FindOne(schema);
+ if (!match_res.ok()) {
+ return Status::Invalid("Bad join key on table : ",
match_res.status().message());
+ }
+ auto match = match_res.ValueOrDie();
+ if (match.indices().size() != 1) {
+ return Status::Invalid("AsOfJoinNode does not support a nested ",
+ to_string(key_kind), "-key ",
field_ref.ToString());
+ }
+ return match.indices()[0];
+ }
+
static arrow::Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*>
inputs,
const ExecNodeOptions& options) {
DCHECK_GE(inputs.size(), 2) << "Must have at least two inputs";
const auto& join_options = checked_cast<const
AsofJoinNodeOptions&>(options);
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Schema> output_schema,
- MakeOutputSchema(inputs, join_options));
+ if (join_options.by_key.size() == 0) {
+ return Status::Invalid("AsOfJoin by_key must not be empty");
+ }
Review Comment:
Is that true? For some reason I thought the by key was optional and the on
key was the only required key.
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -459,17 +642,58 @@ class CompositeReferenceTable {
if (!_ptr2ref.count((uintptr_t)ref.get())) _ptr2ref[(uintptr_t)ref.get()]
= ref;
}
- template <class Builder, class PrimitiveType>
- Result<std::shared_ptr<Array>> MaterializePrimitiveColumn(MemoryPool*
memory_pool,
- size_t i_table,
- col_index_t i_col)
{
- Builder builder(memory_pool);
+ // this should really be a method on ArrayData
+ static bool IsNull(const std::shared_ptr<ArrayData>& source, row_index_t
row) {
+ return ((source->buffers[0] != NULLPTR)
+ ? !bit_util::GetBit(source->buffers[0]->data(), row +
source->offset)
+ : source->null_count.load() == source->length);
+ }
+
+ template <typename T>
+ using is_fixed_width_type = std::is_base_of<FixedWidthType, T>;
+
+ template <typename T, typename R = void>
+ using enable_if_fixed_width_type =
enable_if_t<is_fixed_width_type<T>::value, R>;
Review Comment:
Should you add these to `src/arrow/type_traits.h`?
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -602,52 +828,155 @@ class AsofJoinNode : public ExecNode {
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string>
input_labels,
- const AsofJoinNodeOptions& join_options,
+ const vec_col_index_t& indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key, OnType
tolerance,
std::shared_ptr<Schema> output_schema);
+ Status InternalInit(bool must_hash, bool nullable_by_key,
+ std::vector<std::unique_ptr<KeyHasher>> key_hashers) {
+ key_hashers_.swap(key_hashers);
+ auto inputs = this->inputs();
+ for (size_t i = 0; i < inputs.size(); ++i) {
+ state_.push_back(::arrow::internal::make_unique<InputState>(
+ must_hash, nullable_by_key, key_hashers_[i].get(),
inputs[i]->output_schema(),
+ indices_of_on_key_[i], indices_of_by_key_[i]));
+ }
+
+ col_index_t dst_offset = 0;
+ for (auto& state : state_)
+ dst_offset = state->InitSrcToDstMapping(dst_offset, !!dst_offset);
+
+ return Status::OK();
+ }
+
virtual ~AsofJoinNode() {
process_.Push(false); // poison pill
process_thread_.join();
}
+ const vec_col_index_t& indices_of_on_key() { return indices_of_on_key_; }
+ const std::vector<vec_col_index_t>& indices_of_by_key() { return
indices_of_by_key_; }
+
static arrow::Result<std::shared_ptr<Schema>> MakeOutputSchema(
- const std::vector<ExecNode*>& inputs, const AsofJoinNodeOptions&
options) {
+ const std::vector<ExecNode*>& inputs, const vec_col_index_t&
indices_of_on_key,
+ const std::vector<vec_col_index_t>& indices_of_by_key) {
std::vector<std::shared_ptr<arrow::Field>> fields;
- const auto& on_field_name = *options.on_key.name();
- const auto& by_field_name = *options.by_key.name();
-
+ size_t n_by = indices_of_by_key[0].size();
+ const DataType* on_key_type = NULLPTR;
+ std::vector<const DataType*> by_key_type(n_by, NULLPTR);
// Take all non-key, non-time RHS fields
for (size_t j = 0; j < inputs.size(); ++j) {
const auto& input_schema = inputs[j]->output_schema();
- const auto& on_field_ix = input_schema->GetFieldIndex(on_field_name);
- const auto& by_field_ix = input_schema->GetFieldIndex(by_field_name);
+ const auto& on_field_ix = indices_of_on_key[j];
+ const auto& by_field_ix = indices_of_by_key[j];
- if ((on_field_ix == -1) | (by_field_ix == -1)) {
+ if ((on_field_ix == -1) || std_has(by_field_ix, -1)) {
return Status::Invalid("Missing join key on table ", j);
}
+ const auto& on_field = input_schema->fields()[on_field_ix];
+ std::vector<const Field*> by_field(n_by);
+ for (size_t k = 0; k < n_by; k++) {
+ by_field[k] = input_schema->fields()[by_field_ix[k]].get();
+ }
+
+ if (on_key_type == NULLPTR) {
+ on_key_type = on_field->type().get();
+ } else if (*on_key_type != *on_field->type()) {
+ return Status::Invalid("Expected on-key type ", *on_key_type, " but
got ",
+ *on_field->type(), " for field ",
on_field->name(),
+ " in input ", j);
+ }
+ for (size_t k = 0; k < n_by; k++) {
+ if (by_key_type[k] == NULLPTR) {
+ by_key_type[k] = by_field[k]->type().get();
+ } else if (*by_key_type[k] != *by_field[k]->type()) {
+ return Status::Invalid("Expected on-key type ", *by_key_type[k], "
but got ",
+ *by_field[k]->type(), " for field ",
by_field[k]->name(),
+ " in input ", j);
+ }
+ }
+
for (int i = 0; i < input_schema->num_fields(); ++i) {
const auto field = input_schema->field(i);
- if (field->name() == on_field_name) {
- if (kSupportedOnTypes_.find(field->type()) ==
kSupportedOnTypes_.end()) {
- return Status::Invalid("Unsupported type for on key: ",
field->name());
+ if (i == on_field_ix) {
+ switch (field->type()->id()) {
+ case Type::INT8:
+ case Type::INT16:
+ case Type::INT32:
+ case Type::INT64:
+ case Type::UINT8:
+ case Type::UINT16:
+ case Type::UINT32:
+ case Type::UINT64:
+ case Type::DATE32:
+ case Type::DATE64:
+ case Type::TIME32:
+ case Type::TIME64:
+ case Type::TIMESTAMP:
+ break;
+ default:
+ return Status::Invalid("Unsupported type for on-key ",
field->name(), " : ",
+ field->type()->ToString());
}
// Only add on field from the left table
if (j == 0) {
fields.push_back(field);
}
- } else if (field->name() == by_field_name) {
- if (kSupportedByTypes_.find(field->type()) ==
kSupportedByTypes_.end()) {
- return Status::Invalid("Unsupported type for by key: ",
field->name());
+ } else if (std_has(by_field_ix, i)) {
+ switch (field->type()->id()) {
+ case Type::INT8:
+ case Type::INT16:
+ case Type::INT32:
+ case Type::INT64:
+ case Type::UINT8:
+ case Type::UINT16:
+ case Type::UINT32:
+ case Type::UINT64:
+ case Type::DATE32:
+ case Type::DATE64:
+ case Type::TIME32:
+ case Type::TIME64:
+ case Type::TIMESTAMP:
+ case Type::STRING:
+ case Type::LARGE_STRING:
+ case Type::BINARY:
+ case Type::LARGE_BINARY:
+ break;
+ default:
+ return Status::Invalid("Unsupported type for by-key ",
field->name(), " : ",
+ field->type()->ToString());
}
// Only add by field from the left table
if (j == 0) {
fields.push_back(field);
}
} else {
- if (kSupportedDataTypes_.find(field->type()) ==
kSupportedDataTypes_.end()) {
- return Status::Invalid("Unsupported data type: ", field->name());
+ switch (field->type()->id()) {
+ case Type::INT8:
+ case Type::INT16:
+ case Type::INT32:
+ case Type::INT64:
+ case Type::UINT8:
+ case Type::UINT16:
+ case Type::UINT32:
+ case Type::UINT64:
+ case Type::FLOAT:
+ case Type::DOUBLE:
+ case Type::DATE32:
+ case Type::DATE64:
+ case Type::TIME32:
+ case Type::TIME64:
+ case Type::TIMESTAMP:
+ case Type::STRING:
+ case Type::LARGE_STRING:
+ case Type::BINARY:
+ case Type::LARGE_BINARY:
+ break;
+ default:
+ return Status::Invalid("Unsupported type for field ",
field->name(), " : ",
Review Comment:
```suggestion
return Status::Invalid("Unsupported type for payload field ",
field->name(), " : ",
```
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -17,33 +17,69 @@
#include <condition_variable>
#include <mutex>
-#include <set>
#include <thread>
#include <unordered_map>
+#include <unordered_set>
+#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/schema_util.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
+#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/optional.h"
+#include "arrow/util/string_view.h"
namespace arrow {
namespace compute {
-// Remove this when multiple keys and/or types is supported
-typedef int32_t KeyType;
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+ return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+ return container.end() != std_find(container, val);
+}
+
+typedef uint64_t ByType;
+typedef uint64_t OnType;
+typedef uint64_t HashType;
Review Comment:
Right now this class only works if `HashType` and `OnType` are the same
type. Do you anticipate these types would diverge at some point?
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -32,23 +35,172 @@
#include "arrow/testing/random.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/make_unique.h"
+#include "arrow/util/string_view.h"
#include "arrow/util/thread_pool.h"
+#define TRACED_TEST(t_class, t_name) \
+ static void _##t_class##_##t_name(); \
+ TEST(t_class, t_name) { \
+ ARROW_SCOPED_TRACE(#t_class "_" #t_name); \
+ _##t_class##_##t_name(); \
+ } \
+ static void _##t_class##_##t_name()
+
using testing::UnorderedElementsAreArray;
namespace arrow {
namespace compute {
+bool is_temporal_primitive(Type::type type_id) {
+ switch (type_id) {
+ case Type::TIME32:
+ case Type::TIME64:
+ case Type::DATE32:
+ case Type::DATE64:
+ case Type::TIMESTAMP:
+ return true;
+ default:
+ return false;
+ }
+}
+
+BatchesWithSchema MakeBatchesFromNumString(
+ const std::shared_ptr<Schema>& schema,
+ const std::vector<util::string_view>& json_strings, int multiplicity = 1) {
+ FieldVector num_fields;
+ for (auto field : schema->fields()) {
+ num_fields.push_back(
+ is_base_binary_like(field->type()->id()) ? field->WithType(int64()) :
field);
+ }
+ auto num_schema =
+ std::make_shared<Schema>(num_fields, schema->endianness(),
schema->metadata());
+ BatchesWithSchema num_batches =
+ MakeBatchesFromString(num_schema, json_strings, multiplicity);
+ BatchesWithSchema batches;
+ batches.schema = schema;
+ int n_fields = schema->num_fields();
+ for (auto num_batch : num_batches.batches) {
+ std::vector<Datum> values;
+ for (int i = 0; i < n_fields; i++) {
+ auto type = schema->field(i)->type();
+ if (is_base_binary_like(type->id())) {
+ // casting to string first enables casting to binary
+ Datum as_string = Cast(num_batch.values[i], utf8()).ValueOrDie();
+ values.push_back(Cast(as_string, type).ValueOrDie());
+ } else {
+ values.push_back(num_batch.values[i]);
+ }
+ }
+ ExecBatch batch(values, num_batch.length);
+ batches.batches.push_back(batch);
+ }
+ return batches;
+}
+
+void BuildNullArray(std::shared_ptr<Array>& empty, const
std::shared_ptr<DataType>& type,
+ int64_t length) {
+ ASSERT_OK_AND_ASSIGN(auto builder, MakeBuilder(type, default_memory_pool()));
+ ASSERT_OK(builder->Reserve(length));
+ ASSERT_OK(builder->AppendNulls(length));
+ ASSERT_OK(builder->Finish(&empty));
+}
+
+void BuildZeroPrimitiveArray(std::shared_ptr<Array>& empty,
+ const std::shared_ptr<DataType>& type, int64_t
length) {
+ ASSERT_OK_AND_ASSIGN(auto builder, MakeBuilder(type, default_memory_pool()));
+ ASSERT_OK(builder->Reserve(length));
+ ASSERT_OK_AND_ASSIGN(auto scalar, MakeScalar(type, 0));
+ ASSERT_OK(builder->AppendScalar(*scalar, length));
Review Comment:
It would be nice if scalars could be avoided but probably not a big deal for
a unit test.
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -137,18 +173,88 @@ struct MemoStore {
}
};
+// a specialized higher-performance variation of Hashing64 logic from
hash_join_node
+// the code here avoids recreating objects that are independent of each batch
processed
+class KeyHasher {
+ static constexpr int kMiniBatchLength = util::MiniBatch::kMiniBatchLength;
+
+ public:
+ explicit KeyHasher(const vec_col_index_t& indices)
+ : indices_(indices),
+ metadata_(indices.size()),
+ batch_(NULLPTR),
+ hashes_(),
+ ctx_(),
+ column_arrays_(),
+ stack_() {
+ ctx_.stack = &stack_;
+ column_arrays_.resize(indices.size());
+ }
+
+ Status Init(ExecContext* exec_context, const std::shared_ptr<arrow::Schema>&
schema) {
+ ctx_.hardware_flags = exec_context->cpu_info()->hardware_flags();
+ const auto& fields = schema->fields();
+ for (size_t k = 0; k < metadata_.size(); k++) {
+ ARROW_ASSIGN_OR_RAISE(metadata_[k],
+
ColumnMetadataFromDataType(fields[indices_[k]]->type()));
+ }
+ return stack_.Init(exec_context->memory_pool(),
+ 4 * kMiniBatchLength * sizeof(uint32_t));
Review Comment:
I believe the plan is that, at some point, a node should be able to obtain a
`util::TempVectorStack` and `LightContext` from the plan instead of every node
needing to maintain their own.
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -74,237 +226,723 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
/*same_chunk_layout=*/true, /*flatten=*/true);
}
-void DoRunBasicTest(const std::vector<util::string_view>& l_data,
- const std::vector<util::string_view>& r0_data,
- const std::vector<util::string_view>& r1_data,
- const std::vector<util::string_view>& exp_data, int64_t
tolerance) {
- auto l_schema =
- schema({field("time", int64()), field("key", int32()), field("l_v0",
float64())});
- auto r0_schema =
- schema({field("time", int64()), field("key", int32()), field("r0_v0",
float64())});
- auto r1_schema =
- schema({field("time", int64()), field("key", int32()), field("r1_v0",
float32())});
-
- auto exp_schema = schema({
- field("time", int64()),
- field("key", int32()),
- field("l_v0", float64()),
- field("r0_v0", float64()),
- field("r1_v0", float32()),
- });
-
- // Test three table join
- BatchesWithSchema l_batches, r0_batches, r1_batches, exp_batches;
- l_batches = MakeBatchesFromString(l_schema, l_data);
- r0_batches = MakeBatchesFromString(r0_schema, r0_data);
- r1_batches = MakeBatchesFromString(r1_schema, r1_data);
- exp_batches = MakeBatchesFromString(exp_schema, exp_data);
- CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time", "key",
- tolerance);
-}
-
-void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
- const std::shared_ptr<Schema>& r_schema) {
- BatchesWithSchema l_batches = MakeBatchesFromString(l_schema, {R"([])"});
- BatchesWithSchema r_batches = MakeBatchesFromString(r_schema, {R"([])"});
-
+#define CHECK_RUN_OUTPUT(by_key_type)
\
+ void CheckRunOutput(
\
+ const BatchesWithSchema& l_batches, const BatchesWithSchema& r0_batches,
\
+ const BatchesWithSchema& r1_batches, const BatchesWithSchema&
exp_batches, \
+ const FieldRef time, by_key_type keys, const int64_t tolerance) {
\
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches,
\
+ AsofJoinNodeOptions(time, keys, tolerance));
\
+ }
+
+EXPAND_BY_KEY_TYPE(CHECK_RUN_OUTPUT)
+
+void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str,
+ bool then_run_plan = false) {
ExecContext exec_ctx;
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
- AsofJoinNodeOptions join_options("time", "key", 0);
Declaration join{"asofjoin", join_options};
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{l_batches.schema, l_batches.gen(false,
false)}});
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{r_batches.schema, r_batches.gen(false,
false)}});
- ASSERT_RAISES(Invalid, join.AddToPlan(plan.get()));
+ if (then_run_plan) {
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+ ASSERT_OK(Declaration::Sequence({join, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(Invalid,
+
::testing::HasSubstr(expected_error_str),
+ StartAndCollect(plan.get(),
sink_gen));
+ } else {
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr(expected_error_str),
+ join.AddToPlan(plan.get()));
+ }
+}
+
+void DoRunInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ DoInvalidPlanTest(l_batches, r_batches, join_options, expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ BatchesWithSchema l_batches = MakeBatchesFromNumString(l_schema, {R"([])"});
+ BatchesWithSchema r_batches = MakeBatchesFromNumString(r_schema, {R"([])"});
+
+ return DoRunInvalidPlanTest(l_batches, r_batches, join_options,
expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema, int64_t
tolerance,
+ const std::string& expected_error_str) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", "key",
tolerance),
+ expected_error_str);
+}
+
+void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Unsupported type for ");
+}
+
+void DoRunInvalidToleranceTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, -1,
+ "AsOfJoin tolerance must be non-negative but is ");
+}
+
+void DoRunMissingKeysTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table : No
match");
+}
+
+void DoRunEmptyByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", {}, 0),
+ "AsOfJoin by_key must not be empty");
+}
+
+void DoRunMissingOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("invalid_time",
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunMissingByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
"invalid_key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions({0, "time"},
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
FieldRef{0, 1}, 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunAmbiguousOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+void DoRunAmbiguousByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+std::string GetJsonString(int n_rows, int n_cols, bool unordered = false) {
Review Comment:
Nit. Maybe `GetMonotonicSequenceAsJsonString` or
`GetTestSequenceAsJsonString` plus some documentation around what this is
doing. Also, if you're creating dependable sequences like this then what is
the advantage of going to JSON?
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -657,34 +986,85 @@ class AsofJoinNode : public ExecNode {
return std::make_shared<arrow::Schema>(fields);
}
+ static inline Result<col_index_t> FindColIndex(const Schema& schema,
+ const FieldRef& field_ref,
+ const util::string_view&
key_kind) {
Review Comment:
`string_view` is roughly
```
{
char* str;
size_t len;
}
```
So it's basically "pointer + size". You are correct that we can zero-copy
from `string` into `string_view`. However, the `to_string` you call below is
not zero-copy and you would probably be better off with `const std::string&
key_kind`.
Also, in the future, when you do want to use `string_view`, you should just
do `string_view key_kind` and not `const string_view& key_kind` as it is small
enough it doesn't need to be passed by reference and is better passed by value
(like a pointer).
##########
cpp/src/arrow/compute/exec/asof_join_node.cc:
##########
@@ -294,10 +452,22 @@ class InputState {
// Index of the time col
col_index_t time_col_index_;
// Index of the key col
- col_index_t key_col_index_;
+ vec_col_index_t key_col_index_;
+ // Type id of the time column
+ Type::type time_type_id_;
+ // Type id of the key column
+ std::vector<Type::type> key_type_id_;
+ // Hasher for key elements
+ mutable KeyHasher* key_hasher_;
+ // True if hashing is mandatory
+ bool must_hash_;
+ // True if null by-key values are expected
+ bool nullable_by_key_;
Review Comment:
Does the faster path not work if the column is nullable?
##########
cpp/src/arrow/compute/exec/asof_join_node_test.cc:
##########
@@ -74,237 +226,723 @@ void CheckRunOutput(const BatchesWithSchema& l_batches,
/*same_chunk_layout=*/true, /*flatten=*/true);
}
-void DoRunBasicTest(const std::vector<util::string_view>& l_data,
- const std::vector<util::string_view>& r0_data,
- const std::vector<util::string_view>& r1_data,
- const std::vector<util::string_view>& exp_data, int64_t
tolerance) {
- auto l_schema =
- schema({field("time", int64()), field("key", int32()), field("l_v0",
float64())});
- auto r0_schema =
- schema({field("time", int64()), field("key", int32()), field("r0_v0",
float64())});
- auto r1_schema =
- schema({field("time", int64()), field("key", int32()), field("r1_v0",
float32())});
-
- auto exp_schema = schema({
- field("time", int64()),
- field("key", int32()),
- field("l_v0", float64()),
- field("r0_v0", float64()),
- field("r1_v0", float32()),
- });
-
- // Test three table join
- BatchesWithSchema l_batches, r0_batches, r1_batches, exp_batches;
- l_batches = MakeBatchesFromString(l_schema, l_data);
- r0_batches = MakeBatchesFromString(r0_schema, r0_data);
- r1_batches = MakeBatchesFromString(r1_schema, r1_data);
- exp_batches = MakeBatchesFromString(exp_schema, exp_data);
- CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time", "key",
- tolerance);
-}
-
-void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
- const std::shared_ptr<Schema>& r_schema) {
- BatchesWithSchema l_batches = MakeBatchesFromString(l_schema, {R"([])"});
- BatchesWithSchema r_batches = MakeBatchesFromString(r_schema, {R"([])"});
-
+#define CHECK_RUN_OUTPUT(by_key_type)
\
+ void CheckRunOutput(
\
+ const BatchesWithSchema& l_batches, const BatchesWithSchema& r0_batches,
\
+ const BatchesWithSchema& r1_batches, const BatchesWithSchema&
exp_batches, \
+ const FieldRef time, by_key_type keys, const int64_t tolerance) {
\
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches,
\
+ AsofJoinNodeOptions(time, keys, tolerance));
\
+ }
+
+EXPAND_BY_KEY_TYPE(CHECK_RUN_OUTPUT)
+
+void DoInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str,
+ bool then_run_plan = false) {
ExecContext exec_ctx;
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
- AsofJoinNodeOptions join_options("time", "key", 0);
Declaration join{"asofjoin", join_options};
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{l_batches.schema, l_batches.gen(false,
false)}});
join.inputs.emplace_back(Declaration{
"source", SourceNodeOptions{r_batches.schema, r_batches.gen(false,
false)}});
- ASSERT_RAISES(Invalid, join.AddToPlan(plan.get()));
+ if (then_run_plan) {
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+ ASSERT_OK(Declaration::Sequence({join, {"sink",
SinkNodeOptions{&sink_gen}}})
+ .AddToPlan(plan.get()));
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(Invalid,
+
::testing::HasSubstr(expected_error_str),
+ StartAndCollect(plan.get(),
sink_gen));
+ } else {
+ EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr(expected_error_str),
+ join.AddToPlan(plan.get()));
+ }
+}
+
+void DoRunInvalidPlanTest(const BatchesWithSchema& l_batches,
+ const BatchesWithSchema& r_batches,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ DoInvalidPlanTest(l_batches, r_batches, join_options, expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ BatchesWithSchema l_batches = MakeBatchesFromNumString(l_schema, {R"([])"});
+ BatchesWithSchema r_batches = MakeBatchesFromNumString(r_schema, {R"([])"});
+
+ return DoRunInvalidPlanTest(l_batches, r_batches, join_options,
expected_error_str);
+}
+
+void DoRunInvalidPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema, int64_t
tolerance,
+ const std::string& expected_error_str) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", "key",
tolerance),
+ expected_error_str);
+}
+
+void DoRunInvalidTypeTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Unsupported type for ");
+}
+
+void DoRunInvalidToleranceTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, -1,
+ "AsOfJoin tolerance must be non-negative but is ");
+}
+
+void DoRunMissingKeysTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table : No
match");
+}
+
+void DoRunEmptyByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time", {}, 0),
+ "AsOfJoin by_key must not be empty");
+}
+
+void DoRunMissingOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("invalid_time",
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunMissingByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
"invalid_key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions({0, "time"},
"key", 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunNestedByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, AsofJoinNodeOptions("time",
FieldRef{0, 1}, 0),
+ "Bad join key on table : No match");
+}
+
+void DoRunAmbiguousOnKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+void DoRunAmbiguousByKeyTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunInvalidPlanTest(l_schema, r_schema, 0, "Bad join key on table :
Multiple matches");
+}
+
+std::string GetJsonString(int n_rows, int n_cols, bool unordered = false) {
+ std::stringstream s;
+ s << '[';
+ for (int i = 0; i < n_rows; i++) {
+ if (i > 0) {
+ s << ", ";
+ }
+ s << '[';
+ for (int j = 0; j < n_cols; j++) {
+ if (j > 0) {
+ s << ", " << j;
+ } else if (j < 2) {
+ s << (i ^ unordered);
+ } else {
+ s << i;
+ }
+ }
+ s << ']';
+ }
+ s << ']';
+ return s.str();
+}
+
+void DoRunUnorderedPlanTest(bool l_unordered, bool r_unordered,
+ const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema,
+ const AsofJoinNodeOptions& join_options,
+ const std::string& expected_error_str) {
+ ASSERT_TRUE(l_unordered || r_unordered);
+ int n_rows = 5;
+ std::string l_str = GetJsonString(n_rows, l_schema->num_fields(),
l_unordered);
+ std::string r_str = GetJsonString(n_rows, r_schema->num_fields(),
r_unordered);
+ BatchesWithSchema l_batches = MakeBatchesFromNumString(l_schema, {l_str});
+ BatchesWithSchema r_batches = MakeBatchesFromNumString(r_schema, {r_str});
+
+ return DoInvalidPlanTest(l_batches, r_batches, join_options,
expected_error_str,
+ /*then_run_plan=*/true);
+}
+
+void DoRunUnorderedPlanTest(bool l_unordered, bool r_unordered,
+ const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ DoRunUnorderedPlanTest(l_unordered, r_unordered, l_schema, r_schema,
+ AsofJoinNodeOptions("time", "key", 1000),
+ "out-of-order on-key values");
+}
+
+void DoRunNullByKeyPlanTest(const std::shared_ptr<Schema>& l_schema,
+ const std::shared_ptr<Schema>& r_schema) {
+ AsofJoinNodeOptions join_options{"time", "key2", 1000};
+ std::string expected_error_str = "unexpected null by-key values";
+ int n_rows = 5;
+ std::string l_str = GetJsonString(n_rows, l_schema->num_fields());
+ std::string r_str = GetJsonString(n_rows, r_schema->num_fields());
+ BatchesWithSchema l_batches = MakeBatchesFromNumString(l_schema, {l_str});
+ BatchesWithSchema r_batches = MakeBatchesFromNumString(r_schema, {r_str});
+ l_batches = MutateByKey(l_batches, "key", "key2", true, true);
+ r_batches = MutateByKey(r_batches, "key", "key2", true, true);
+
+ return DoInvalidPlanTest(l_batches, r_batches, join_options,
expected_error_str,
+ /*then_run_plan=*/true);
}
+struct BasicTestTypes {
+ std::shared_ptr<DataType> time, key, l_val, r0_val, r1_val;
+};
+
+struct BasicTest {
+ BasicTest(const std::vector<util::string_view>& l_data,
+ const std::vector<util::string_view>& r0_data,
+ const std::vector<util::string_view>& r1_data,
+ const std::vector<util::string_view>& exp_nokey_data,
+ const std::vector<util::string_view>& exp_data, int64_t tolerance)
+ : l_data(std::move(l_data)),
+ r0_data(std::move(r0_data)),
+ r1_data(std::move(r1_data)),
+ exp_nokey_data(std::move(exp_nokey_data)),
+ exp_data(std::move(exp_data)),
+ tolerance(tolerance) {}
+
+ template <typename TypeCond>
+ static inline void init_types(const std::vector<std::shared_ptr<DataType>>&
all_types,
+ std::vector<std::shared_ptr<DataType>>& types,
+ TypeCond type_cond) {
+ if (types.size() == 0) {
+ for (auto type : all_types) {
+ if (type_cond(type)) {
+ types.push_back(type);
+ }
+ }
+ }
+ }
+
+ void RunSingleByKey(std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ using B = BatchesWithSchema;
+ RunBatches([this](B l_batches, B r0_batches, B r1_batches, B
exp_nokey_batches,
+ B exp_batches) {
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time",
"key",
+ tolerance);
+ });
+ }
+ void RunDoubleByKey(std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ using B = BatchesWithSchema;
+ RunBatches([this](B l_batches, B r0_batches, B r1_batches, B
exp_nokey_batches,
+ B exp_batches) {
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time",
+ {"key", "key"}, tolerance);
+ });
+ }
+ void RunMutateByKey(std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ using B = BatchesWithSchema;
+ RunBatches([this](B l_batches, B r0_batches, B r1_batches, B
exp_nokey_batches,
+ B exp_batches) {
+ l_batches = MutateByKey(l_batches, "key", "key2");
+ r0_batches = MutateByKey(r0_batches, "key", "key2");
+ r1_batches = MutateByKey(r1_batches, "key", "key2");
+ exp_batches = MutateByKey(exp_batches, "key", "key2");
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_batches, "time",
+ {"key", "key2"}, tolerance);
+ });
+ }
+ void RunMutateNoKey(std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ using B = BatchesWithSchema;
+ RunBatches([this](B l_batches, B r0_batches, B r1_batches, B
exp_nokey_batches,
+ B exp_batches) {
+ l_batches = MutateByKey(l_batches, "key", "key2", true);
+ r0_batches = MutateByKey(r0_batches, "key", "key2", true);
+ r1_batches = MutateByKey(r1_batches, "key", "key2", true);
+ exp_nokey_batches = MutateByKey(exp_nokey_batches, "key", "key2", true);
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_nokey_batches,
"time", "key2",
+ tolerance);
+ });
+ }
+ void RunMutateNullKey(std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ using B = BatchesWithSchema;
+ RunBatches([this](B l_batches, B r0_batches, B r1_batches, B
exp_nokey_batches,
+ B exp_batches) {
+ l_batches = MutateByKey(l_batches, "key", "key2", true, true);
+ r0_batches = MutateByKey(r0_batches, "key", "key2", true, true);
+ r1_batches = MutateByKey(r1_batches, "key", "key2", true, true);
+ exp_nokey_batches = MutateByKey(exp_nokey_batches, "key", "key2", true,
true);
+ CheckRunOutput(l_batches, r0_batches, r1_batches, exp_nokey_batches,
+ AsofJoinNodeOptions("time", "key2", tolerance,
+ /*nullable_by_key=*/true));
+ });
+ }
+ template <typename BatchesRunner>
+ void RunBatches(BatchesRunner batches_runner,
+ std::vector<std::shared_ptr<DataType>> time_types = {},
+ std::vector<std::shared_ptr<DataType>> key_types = {},
+ std::vector<std::shared_ptr<DataType>> l_types = {},
+ std::vector<std::shared_ptr<DataType>> r0_types = {},
+ std::vector<std::shared_ptr<DataType>> r1_types = {}) {
+ std::vector<std::shared_ptr<DataType>> all_types = {
+ utf8(),
+ large_utf8(),
+ binary(),
+ large_binary(),
+ int8(),
+ int16(),
+ int32(),
+ int64(),
+ uint8(),
+ uint16(),
+ uint32(),
+ uint64(),
+ date32(),
+ date64(),
+ time32(TimeUnit::MILLI),
+ time32(TimeUnit::SECOND),
+ time64(TimeUnit::NANO),
+ time64(TimeUnit::MICRO),
+ timestamp(TimeUnit::NANO, "UTC"),
+ timestamp(TimeUnit::MICRO, "UTC"),
+ timestamp(TimeUnit::MILLI, "UTC"),
+ timestamp(TimeUnit::SECOND, "UTC"),
+ float32(),
+ float64()};
+ using T = const std::shared_ptr<DataType>;
+ // byte_width > 1 below allows fitting the tested data
+ init_types(all_types, time_types,
+ [](T& t) { return t->byte_width() > 1 && !is_floating(t->id());
});
+ ASSERT_NE(0, time_types.size());
+ init_types(all_types, key_types, [](T& t) { return !is_floating(t->id());
});
+ ASSERT_NE(0, key_types.size());
+ init_types(all_types, l_types, [](T& t) { return true; });
+ ASSERT_NE(0, l_types.size());
+ init_types(all_types, r0_types, [](T& t) { return t->byte_width() > 1; });
+ ASSERT_NE(0, r0_types.size());
+ init_types(all_types, r1_types, [](T& t) { return t->byte_width() > 1; });
+ ASSERT_NE(0, r1_types.size());
+
+ // sample a limited number of type-combinations to keep the runnning time
reasonable
+ // the scoped-traces below help reproduce a test failure, should it happen
+ auto start_time = std::chrono::system_clock::now();
+ auto seed = start_time.time_since_epoch().count();
+ ARROW_SCOPED_TRACE("Types seed: ", seed);
+ std::default_random_engine engine(static_cast<unsigned int>(seed));
+ std::uniform_int_distribution<size_t> time_distribution(0,
time_types.size() - 1);
+ std::uniform_int_distribution<size_t> key_distribution(0, key_types.size()
- 1);
+ std::uniform_int_distribution<size_t> l_distribution(0, l_types.size() -
1);
+ std::uniform_int_distribution<size_t> r0_distribution(0, r0_types.size() -
1);
+ std::uniform_int_distribution<size_t> r1_distribution(0, r1_types.size() -
1);
+
+ for (int i = 0; i < 1000; i++) {
+ auto time_type = time_types[time_distribution(engine)];
+ ARROW_SCOPED_TRACE("Time type: ", *time_type);
+ auto key_type = key_types[key_distribution(engine)];
+ ARROW_SCOPED_TRACE("Key type: ", *key_type);
+ auto l_type = l_types[l_distribution(engine)];
+ ARROW_SCOPED_TRACE("Left type: ", *l_type);
+ auto r0_type = r0_types[r0_distribution(engine)];
+ ARROW_SCOPED_TRACE("Right-0 type: ", *r0_type);
+ auto r1_type = r1_types[r1_distribution(engine)];
+ ARROW_SCOPED_TRACE("Right-1 type: ", *r1_type);
+
+ RunTypes({time_type, key_type, l_type, r0_type, r1_type},
batches_runner);
+
+ auto end_time = std::chrono::system_clock::now();
+ std::chrono::duration<double> diff = end_time - start_time;
+ if (diff.count() > 2) {
+ std::cerr << "AsofJoin test reached time limit at iteration " << i <<
std::endl;
+ // this normally happens on slow CI systems, but is fine
+ break;
+ }
+ }
+ }
+ template <typename BatchesRunner>
+ void RunTypes(BasicTestTypes basic_test_types, BatchesRunner batches_runner)
{
+ const BasicTestTypes& b = basic_test_types;
+ auto l_schema =
+ schema({field("time", b.time), field("key", b.key), field("l_v0",
b.l_val)});
+ auto r0_schema =
+ schema({field("time", b.time), field("key", b.key), field("r0_v0",
b.r0_val)});
+ auto r1_schema =
+ schema({field("time", b.time), field("key", b.key), field("r1_v0",
b.r1_val)});
+
+ auto exp_schema = schema({
+ field("time", b.time),
+ field("key", b.key),
+ field("l_v0", b.l_val),
+ field("r0_v0", b.r0_val),
+ field("r1_v0", b.r1_val),
+ });
+
+ // Test three table join
+ BatchesWithSchema l_batches, r0_batches, r1_batches, exp_nokey_batches,
exp_batches;
+ l_batches = MakeBatchesFromNumString(l_schema, l_data);
+ r0_batches = MakeBatchesFromNumString(r0_schema, r0_data);
+ r1_batches = MakeBatchesFromNumString(r1_schema, r1_data);
+ exp_nokey_batches = MakeBatchesFromNumString(exp_schema, exp_nokey_data);
+ exp_batches = MakeBatchesFromNumString(exp_schema, exp_data);
+ batches_runner(l_batches, r0_batches, r1_batches, exp_nokey_batches,
exp_batches);
+ }
+
+ std::vector<util::string_view> l_data;
+ std::vector<util::string_view> r0_data;
+ std::vector<util::string_view> r1_data;
+ std::vector<util::string_view> exp_nokey_data;
+ std::vector<util::string_view> exp_data;
+ int64_t tolerance;
+};
+
class AsofJoinTest : public testing::Test {};
-TEST(AsofJoinTest, TestBasic1) {
+#define ASOFJOIN_TEST_SET(name, num) \
Review Comment:
Do you think it would be possible to make this a parameterized test instead
of a macro?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]