save-buffer commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r829345518



##########
File path: cpp/src/arrow/compute/exec/tpch_node_test.cc
##########
@@ -0,0 +1,623 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-matchers.h>
+
+#include "arrow/api.h"
+#include "arrow/array/validate.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/compute/exec/tpch_node.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/kernels/test_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/matchers.h"
+#include "arrow/testing/random.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/pcg_random.h"
+#include "arrow/util/thread_pool.h"
+
+#include <cctype>
+#include <string>
+#include <unordered_set>
+
+namespace arrow {
+namespace compute {
+static constexpr uint32_t kStartDate =
+    8035;  // January 1, 1992 is 8035 days after January 1, 1970
+static constexpr uint32_t kEndDate =
+    10591;  // December 12, 1998 is 10591 days after January 1, 1970
+
+void ValidateBatch(const ExecBatch& batch) {
+  for (const Datum& d : batch.values)
+    ASSERT_OK(arrow::internal::ValidateArray(*d.array()));
+}
+
+void VerifyUniqueKey(std::unordered_set<int32_t>& seen, const Datum& d, 
int32_t min,
+                     int32_t max) {
+  const int32_t* keys = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  int64_t num_keys = d.length();
+  for (int64_t i = 0; i < num_keys; i++) {
+    ASSERT_TRUE(seen.insert(keys[i]).second);
+    ASSERT_LE(keys[i], max);
+    ASSERT_GE(keys[i], min);
+  }
+}
+
+void VerifyStringAndNumber_Single(const char* row, const char* prefix, const 
int64_t i,
+                                  const int32_t* nums, int byte_width,
+                                  bool verify_padding) {
+  int num_offset = static_cast<int>(std::strlen(prefix));
+  ASSERT_EQ(std::memcmp(row, prefix, num_offset), 0)
+      << row << ", prefix=" << prefix << ", i=" << i;
+  const char* num_str = row + num_offset;
+  int64_t num = 0;
+  int ibyte = static_cast<int>(num_offset);
+  for (; *num_str && ibyte < byte_width; ibyte++) {
+    num *= 10;
+    ASSERT_TRUE(std::isdigit(*num_str));
+    num += *num_str++ - '0';
+  }
+  if (nums) {
+    ASSERT_EQ(static_cast<int32_t>(num), nums[i]);
+  }
+  if (verify_padding) {
+    int num_chars = ibyte - num_offset;
+    ASSERT_GE(num_chars, 9);
+  }
+}
+
+void VerifyStringAndNumber_FixedWidth(const Datum& strings, const Datum& 
numbers,
+                                      int byte_width, const char* prefix,
+                                      bool verify_padding = true) {
+  int64_t length = strings.length();
+  const char* str = reinterpret_cast<const 
char*>(strings.array()->buffers[1]->data());
+
+  const int32_t* nums = nullptr;
+  if (numbers.kind() != Datum::NONE) {
+    ASSERT_EQ(length, numbers.length());
+    nums = reinterpret_cast<const 
int32_t*>(numbers.array()->buffers[1]->data());
+  }
+
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = str + i * byte_width;
+    VerifyStringAndNumber_Single(row, prefix, i, nums, byte_width, 
verify_padding);
+  }
+}
+
+void VerifyStringAndNumber_Varlen(const Datum& strings, const Datum& numbers,
+                                  const char* prefix, bool verify_padding = 
true) {
+  int64_t length = strings.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(strings.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(strings.array()->buffers[2]->data());
+
+  const int32_t* nums = nullptr;
+  if (numbers.kind() != Datum::NONE) {
+    ASSERT_EQ(length, numbers.length());
+    nums = reinterpret_cast<const 
int32_t*>(numbers.array()->buffers[1]->data());
+  }
+
+  for (int64_t i = 0; i < length; i++) {
+    char tmp_str[256] = {};
+    int32_t start = offsets[i];
+    int32_t str_len = offsets[i + 1] - offsets[i];
+    std::memcpy(tmp_str, str + start, str_len);
+    VerifyStringAndNumber_Single(tmp_str, prefix, i, nums, sizeof(tmp_str),
+                                 verify_padding);
+  }
+}
+
+void VerifyVString(const Datum& d, int min_length, int max_length) {
+  int64_t length = d.length();
+  const int32_t* off = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int32_t start = off[i];
+    int32_t end = off[i + 1];
+    int32_t str_len = end - start;
+    ASSERT_LE(str_len, max_length);
+    ASSERT_GE(str_len, min_length);
+    for (int32_t i = start; i < end; i++) {
+      bool is_valid =
+          std::isdigit(str[i]) || std::isalpha(str[i]) || str[i] == ',' || 
str[i] == ' ';
+      ASSERT_TRUE(is_valid) << "Character " << str[i]
+                            << " is not a digit, a letter, a comma, or a 
space";
+    }
+  }
+}
+
+void VerifyModuloBetween(const Datum& d, int32_t min, int32_t max, int32_t 
mod) {
+  int64_t length = d.length();
+  const int32_t* n = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int32_t m = n[i] % mod;
+    ASSERT_GE(m, min) << "Value must be between " << min << " and " << max << 
" mod "
+                      << mod << ", " << n[i] << " % " << mod << " = " << m;
+    ASSERT_LE(m, max) << "Value must be between " << min << " and " << max << 
" mod "
+                      << mod << ", " << n[i] << " % " << mod << " = " << m;
+  }
+}
+
+void VerifyAllBetween(const Datum& d, int32_t min, int32_t max) {
+  int64_t length = d.length();
+  const int32_t* n = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    ASSERT_GE(n[i], min) << "Value must be between " << min << " and " << max 
<< ", got "
+                         << n[i];
+    ASSERT_LE(n[i], max) << "Value must be between " << min << " and " << max 
<< ", got "
+                         << n[i];
+  }
+}
+
+void VerifyNationKey(const Datum& d) { VerifyAllBetween(d, 0, 24); }
+
+void VerifyPhone(const Datum& d) {
+  int64_t length = d.length();
+  const char* phones = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  constexpr int kByteWidth = 15;  // This is common for all PHONE columns
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = phones + i * kByteWidth;
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_EQ(*row++, '-');
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_EQ(*row++, '-');
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_EQ(*row++, '-');
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+    ASSERT_TRUE(std::isdigit(*row++));
+  }
+}
+
+void VerifyDecimalsBetween(const Datum& d, int64_t min, int64_t max) {
+  int64_t length = d.length();
+  const Decimal128* decs =
+      reinterpret_cast<const Decimal128*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    int64_t val = static_cast<int64_t>(decs[i]);
+    ASSERT_LE(val, max);
+    ASSERT_GE(val, min);
+  }
+}
+
+void VerifyCorrectNumberOfWords_Varlen(const Datum& d, int num_words) {
+  int expected_num_spaces = num_words - 1;
+  int64_t length = d.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+
+  for (int64_t i = 0; i < length; i++) {
+    int actual_num_spaces = 0;
+
+    int32_t start = offsets[i];
+    int32_t end = offsets[i + 1];
+    int32_t str_len = end - start;
+    char tmp_str[256] = {};
+    std::memcpy(tmp_str, str + start, str_len);
+    bool is_only_alphas_or_spaces = true;
+    for (int32_t j = offsets[i]; j < offsets[i + 1]; j++) {
+      bool is_space = str[j] == ' ';
+      actual_num_spaces += is_space;
+      is_only_alphas_or_spaces &= (is_space || std::isalpha(str[j]));
+    }
+    ASSERT_TRUE(is_only_alphas_or_spaces)
+        << "Words must be composed only of letters, got " << tmp_str;
+    ASSERT_EQ(actual_num_spaces, expected_num_spaces)
+        << "Wrong number of spaces in " << tmp_str;
+  }
+}
+
+void VerifyCorrectNumberOfWords_FixedWidth(const Datum& d, int num_words,
+                                           int byte_width) {
+  int expected_num_spaces = num_words - 1;
+  int64_t length = d.length();
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+
+  for (int64_t i = 0; i < length; i++) {
+    int actual_num_spaces = 0;
+    const char* row = str + i * byte_width;
+    bool is_only_alphas_or_spaces = true;
+    for (int32_t j = 0; j < byte_width && row[j]; j++) {
+      bool is_space = row[j] == ' ';
+      actual_num_spaces += is_space;
+      is_only_alphas_or_spaces &= (is_space || std::isalpha(row[j]));
+    }
+    ASSERT_TRUE(is_only_alphas_or_spaces)
+        << "Words must be composed only of letters, got " << row;
+    ASSERT_EQ(actual_num_spaces, expected_num_spaces)
+        << "Wrong number of spaces in " << row;
+  }
+}
+
+void VerifyOneOf(const Datum& d, const std::unordered_set<char>& 
possibilities) {
+  int64_t length = d.length();
+  const char* col = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++)
+    ASSERT_TRUE(possibilities.find(col[i]) != possibilities.end());
+}
+
+void VerifyOneOf(const Datum& d, int32_t byte_width,
+                 const std::unordered_set<std::string>& possibilities) {
+  int64_t length = d.length();
+  const char* col = reinterpret_cast<const 
char*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = col + i * byte_width;
+    char tmp_str[256] = {};
+    std::memcpy(tmp_str, row, byte_width);
+    ASSERT_TRUE(possibilities.find(tmp_str) != possibilities.end())
+        << tmp_str << " is not a valid string.";
+  }
+}
+
+void CountInstances(std::unordered_map<int32_t, int32_t>& counts, const Datum& 
d) {
+  int64_t length = d.length();
+  const int32_t* nums = reinterpret_cast<const 
int32_t*>(d.array()->buffers[1]->data());
+  for (int64_t i = 0; i < length; i++) counts[nums[i]]++;
+}
+
+void CountModifiedComments(const Datum& d, int& good_count, int& bad_count) {
+  int64_t length = d.length();
+  const int32_t* offsets =
+      reinterpret_cast<const int32_t*>(d.array()->buffers[1]->data());
+  const char* str = reinterpret_cast<const 
char*>(d.array()->buffers[2]->data());
+  // Length of S_COMMENT is at most 100
+  char tmp_string[101];
+  for (int64_t i = 0; i < length; i++) {
+    const char* row = str + offsets[i];
+    int32_t row_length = offsets[i + 1] - offsets[i];
+    std::memset(tmp_string, 0, sizeof(tmp_string));
+    std::memcpy(tmp_string, row, row_length);
+    char* customer = std::strstr(tmp_string, "Customer");
+    char* recommends = std::strstr(tmp_string, "Recommends");
+    char* complaints = std::strstr(tmp_string, "Complaints");
+    if (customer) {
+      ASSERT_TRUE((recommends != nullptr) ^ (complaints != nullptr));
+      if (recommends) good_count++;
+      if (complaints) bad_count++;
+    }
+  }
+}
+
+TEST(TpchNode, ScaleFactor) {
+  ExecContext ctx(default_memory_pool(), arrow::internal::GetCpuThreadPool());
+  std::shared_ptr<ExecPlan> plan = *ExecPlan::Make(&ctx);
+  TpchGen gen = *TpchGen::Make(plan.get(), 0.25f);
+  ExecNode* table = *gen.Supplier();
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+  Declaration sink("sink", {Declaration::Input(table)}, 
SinkNodeOptions{&sink_gen});
+  std::ignore = *sink.AddToPlan(plan.get());
+  auto fut = StartAndCollect(plan.get(), sink_gen);
+  auto res = *fut.MoveResult();
+
+  int64_t kExpectedRows = 2500;
+  int64_t num_rows = 0;
+  for (auto& batch : res) num_rows += batch.length;
+  ASSERT_EQ(num_rows, kExpectedRows);
+  arrow::internal::GetCpuThreadPool()->WaitForIdle();

Review comment:
       it was necessary at one point but i think it's not anymore. i've deleted 
it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to