save-buffer commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r834589559



##########
File path: cpp/src/arrow/compute/exec/tpch_node_test.cc
##########
@@ -0,0 +1,631 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/tpch_node.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/pcg_random.h"
+#include "arrow/util/thread_pool.h"
+
+#include <cctype>
+#include <regex>
+#include <string>
+#include <unordered_set>
+
+namespace arrow {
+namespace compute {
+namespace internal {
+static constexpr uint32_t kStartDate =
+    8035;  // January 1, 1992 is 8035 days after January 1, 1970
+static constexpr uint32_t kEndDate =
+    10591;  // December 12, 1998 is 10591 days after January 1, 1970
+
+// Verifies that the data is valid Arrow and ensures it's not null.
+void ValidateBatch(const ExecBatch& batch) {
+  for (const Datum& d : batch.values) {
+    ASSERT_EQ(d.array()->buffers[0].get(), nullptr);
+    ASSERT_OK(d.make_array()->ValidateFull());
+  }
+}
+
+// Verifies that each element is seen exactly once, and that it's between min 
and max
+// inclusive
+void VerifyUniqueKey(std::unordered_set<int32_t>* seen, const Datum& d, 
int32_t min,
+                     int32_t max) {
+  const int32_t* keys = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  int64_t num_keys = d.length();
+  for (int64_t i = 0; i < num_keys; i++) {
+    ASSERT_TRUE(seen->insert(keys[i]).second);
+    ASSERT_LE(keys[i], max);
+    ASSERT_GE(keys[i], min);
+  }
+}
+
+void VerifyStringAndNumber_Single(const util::string_view& row, const char* 
prefix,
+                                  const int64_t i, const int32_t* nums,
+                                  bool verify_padding) {
+  size_t num_offset = static_cast<int>(std::strlen(prefix));
+  ASSERT_TRUE(row.starts_with(prefix)) << row << ", prefix=" << prefix << ", 
i=" << i;
+  const char* num_str = row.data() + num_offset;
+  int64_t num = 0;
+  size_t ibyte = num_offset;
+  // Parse the number out
+  for (; *num_str && ibyte < row.size(); ibyte++) {
+    num *= 10;
+    ASSERT_TRUE(std::isdigit(*num_str));
+    num += *num_str++ - '0';
+  }
+  // If nums is not null, ensure it matches the parsed number
+  if (nums) {
+    ASSERT_EQ(static_cast<int32_t>(num), nums[i]);
+  }
+  // TPC-H requires only ever requires padding up to 9 digits, so we ensure 
that
+  // the total length of the string was at least 9 (could be more for bigger 
numbers).
+  if (verify_padding) {
+    int64_t num_chars = static_cast<int64_t>(ibyte - num_offset);
+    ASSERT_GE(num_chars, 9);
+  }
+}
+
+// Verifies that each row is the string "prefix" followed by a number. If 
numbers is not
+// EMPTY, it also checks that the number following the prefix is equal to the
+// corresponding row in numbers. Some TPC-H data is padded to 9 zeros, which 
this function
+// can optionally verify as well. This string function verifies fixed width 
columns.
+void VerifyStringAndNumber_FixedWidth(const Datum& strings, const Datum& 
numbers,
+                                      int byte_width, const char* prefix,
+                                      bool verify_padding = true) {
+  int64_t length = strings.length();
+  const char* str = reinterpret_cast<const 
char*>(strings.array()->buffers[1]->data());
+
+  const int32_t* nums = nullptr;
+  if (numbers.kind() != Datum::NONE) {
+    ASSERT_EQ(length, numbers.length());
+    nums = reinterpret_cast<const 
int32_t*>(numbers.array()->buffers[1]->data());
+  }
+
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = str + i * byte_width;
+    util::string_view view(row, byte_width);
+    VerifyStringAndNumber_Single(view, prefix, i, nums, verify_padding);
+  }
+}
+
+// Same as above but for variable length columns
+void VerifyStringAndNumber_Varlen(const Datum& strings, const Datum& numbers,
+                                  const char* prefix, bool verify_padding = 
true) {
+  int64_t length = strings.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(strings.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(strings.array()->buffers[2]->data());
+
+  const int32_t* nums = nullptr;
+  if (numbers.kind() != Datum::NONE) {
+    ASSERT_EQ(length, numbers.length());
+    nums = reinterpret_cast<const 
int32_t*>(numbers.array()->buffers[1]->data());
+  }
+
+  for (int64_t i = 0; i < length; i++) {
+    int32_t start = offsets[i];
+    int32_t str_len = offsets[i + 1] - offsets[i];
+    util::string_view view(str + start, str_len);
+    VerifyStringAndNumber_Single(view, prefix, i, nums, verify_padding);
+  }
+}
+
+// Verifies that each row is a V-string, which is defined in the spec to be
+// a string of random length between min_length and max_length, that is 
composed
+// of alphanumeric characters, commas, or spaces.
+void VerifyVString(const Datum& d, int min_length, int max_length) {
+  int64_t length = d.length();
+  const int32_t* off = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int32_t start = off[i];
+    int32_t end = off[i + 1];
+    int32_t str_len = end - start;
+    ASSERT_LE(str_len, max_length);
+    ASSERT_GE(str_len, min_length);
+    for (int32_t i = start; i < end; i++) {
+      bool is_valid =
+          std::isdigit(str[i]) || std::isalpha(str[i]) || str[i] == ',' || 
str[i] == ' ';
+      ASSERT_TRUE(is_valid) << "Character " << str[i]
+                            << " is not a digit, a letter, a comma, or a 
space";
+    }
+  }
+}
+
+// Verifies that each 32-bit element modulo "mod" is between min and max.
+void VerifyModuloBetween(const Datum& d, int32_t min, int32_t max, int32_t 
mod) {
+  int64_t length = d.length();
+  const int32_t* n = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int32_t m = n[i] % mod;
+    ASSERT_GE(m, min) << "Value must be between " << min << " and " << max << 
" mod "
+                      << mod << ", " << n[i] << " % " << mod << " = " << m;
+    ASSERT_LE(m, max) << "Value must be between " << min << " and " << max << 
" mod "
+                      << mod << ", " << n[i] << " % " << mod << " = " << m;
+  }
+}
+
+// Verifies that each 32-bit element is between min and max.
+void VerifyAllBetween(const Datum& d, int32_t min, int32_t max) {
+  int64_t length = d.length();
+  const int32_t* n = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    ASSERT_GE(n[i], min) << "Value must be between " << min << " and " << max 
<< ", got "
+                         << n[i];
+    ASSERT_LE(n[i], max) << "Value must be between " << min << " and " << max 
<< ", got "
+                         << n[i];
+  }
+}
+
+void VerifyNationKey(const Datum& d) { VerifyAllBetween(d, 0, 24); }
+
+// Verifies that each row satisfies the phone number spec.
+void VerifyPhone(const Datum& d) {
+  int64_t length = d.length();
+  const char* phones = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  constexpr int kByteWidth = 15;  // This is common for all PHONE columns
+  std::regex exp("\\d{2}-\\d{3}-\\d{3}-\\d{4}");
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = phones + i * kByteWidth;
+    ASSERT_TRUE(std::regex_match(row, row + kByteWidth, exp));
+  }
+}
+
+// Verifies that each decimal is between min and max
+void VerifyDecimalsBetween(const Datum& d, int64_t min, int64_t max) {
+  int64_t length = d.length();
+  const Decimal128* decs =
+      reinterpret_cast<const Decimal128*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int64_t val = static_cast<int64_t>(decs[i]);
+    ASSERT_LE(val, max);
+    ASSERT_GE(val, min);
+  }
+}
+
+// Verifies that each variable-length row is a series of words separated by
+// spaces. Number of words is determined by the number of spaces.
+void VerifyCorrectNumberOfWords_Varlen(const Datum& d, int num_words) {
+  int expected_num_spaces = num_words - 1;
+  int64_t length = d.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+
+  for (int64_t i = 0; i < length; i++) {
+    int actual_num_spaces = 0;
+
+    int32_t start = offsets[i];
+    int32_t end = offsets[i + 1];
+    int32_t str_len = end - start;
+    char tmp_str[256] = {};
+    std::memcpy(tmp_str, str + start, str_len);
+    bool is_only_alphas_or_spaces = true;
+    for (int32_t j = offsets[i]; j < offsets[i + 1]; j++) {
+      bool is_space = str[j] == ' ';
+      actual_num_spaces += is_space;
+      is_only_alphas_or_spaces &= (is_space || std::isalpha(str[j]));
+    }
+    ASSERT_TRUE(is_only_alphas_or_spaces)
+        << "Words must be composed only of letters, got " << tmp_str;
+    ASSERT_EQ(actual_num_spaces, expected_num_spaces)
+        << "Wrong number of spaces in " << tmp_str;
+  }
+}
+
+// Same as above but for fixed width columns.
+void VerifyCorrectNumberOfWords_FixedWidth(const Datum& d, int num_words,
+                                           int byte_width) {
+  int expected_num_spaces = num_words - 1;
+  int64_t length = d.length();
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+
+  for (int64_t i = 0; i < length; i++) {
+    int actual_num_spaces = 0;
+    const char* row = str + i * byte_width;
+    bool is_only_alphas_or_spaces = true;
+    for (int32_t j = 0; j < byte_width && row[j]; j++) {
+      bool is_space = row[j] == ' ';
+      actual_num_spaces += is_space;
+      is_only_alphas_or_spaces &= (is_space || std::isalpha(row[j]));
+    }
+    ASSERT_TRUE(is_only_alphas_or_spaces)
+        << "Words must be composed only of letters, got " << row;
+    ASSERT_EQ(actual_num_spaces, expected_num_spaces)
+        << "Wrong number of spaces in " << row;
+  }
+}
+
+// Verifies that each row of the single-byte-wide column is one of the 
possibilities.
+void VerifyOneOf(const Datum& d, const std::unordered_set<char>& 
possibilities) {
+  int64_t length = d.length();
+  const char* col = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++)
+    ASSERT_TRUE(possibilities.find(col[i]) != possibilities.end());
+}
+
+// Verifies that each fixed-width row is one of the possibilities
+void VerifyOneOf(const Datum& d, int32_t byte_width,
+                 const std::unordered_set<util::string_view>& possibilities) {
+  int64_t length = d.length();
+  const char* col = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = col + i * byte_width;
+    int32_t row_len = 0;
+    while (row[row_len] && row_len < byte_width) row_len++;
+    util::string_view view(row, row_len);
+    ASSERT_TRUE(possibilities.find(view) != possibilities.end())
+        << view << " is not a valid string.";
+  }
+}
+
+// Counts the number of instances of each integer
+void CountInstances(std::unordered_map<int32_t, int32_t>& counts, const Datum& 
d) {
+  int64_t length = d.length();
+  const int32_t* nums = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) counts[nums[i]]++;
+}
+
+// For the S_COMMENT column, some of the columns must be modified to contain
+// "Customer...Complaints" or "Customer...Recommends". This function counts 
the number of
+// good and bad comments.
+void CountModifiedComments(const Datum& d, int& good_count, int& bad_count) {
+  int64_t length = d.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+  // Length of S_COMMENT is at most 100
+  char tmp_string[101];
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = str + offsets[i];
+    int32_t row_length = offsets[i + 1] - offsets[i];
+    std::memset(tmp_string, 0, sizeof(tmp_string));
+    std::memcpy(tmp_string, row, row_length);
+    char* customer = std::strstr(tmp_string, "Customer");
+    char* recommends = std::strstr(tmp_string, "Recommends");
+    char* complaints = std::strstr(tmp_string, "Complaints");
+    if (customer) {
+      ASSERT_TRUE((recommends != nullptr) ^ (complaints != nullptr));
+      if (recommends) good_count++;
+      if (complaints) bad_count++;
+    }
+  }
+}
+
+TEST(TpchNode, ScaleFactor) {
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  std::shared_ptr<ExecPlan> plan = *ExecPlan::Make(&ctx);
+  TpchGen gen = *TpchGen::Make(plan.get(), 0.25f);
+  ExecNode* table = *gen.Supplier();
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+  Declaration sink("sink", {Declaration::Input(table)}, 
SinkNodeOptions{&sink_gen});
+  std::ignore = *sink.AddToPlan(plan.get());
+  auto fut = StartAndCollect(plan.get(), sink_gen);
+  auto res = *fut.MoveResult();

Review comment:
       i've factored this setup code into a function, so i think i can just get 
away with `return fut.MoveResult()` because the function returns a `Result`.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to