westonpace commented on a change in pull request #12537: URL: https://github.com/apache/arrow/pull/12537#discussion_r829466609
########## File path: cpp/src/arrow/compute/exec/tpch_node.h ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> +#include <vector> +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/options.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/pcg_random.h" + +namespace arrow { +namespace compute { +class OrdersAndLineItemGenerator; +class PartAndPartSupplierGenerator; + +class ARROW_EXPORT TpchGen { + public: + /* + * \brief Create a factory for nodes that generate TPC-H data + * + * Note: Individual tables will reference each other. It is important that you only + * create a single TpchGen instance for each plan and then you can create nodes for each + * table from that single TpchGen instance. Note: Every batch will be scheduled as a new + * task using the ExecPlan's scheduler. + */ + static Result<TpchGen> Make(ExecPlan* plan, float scale_factor = 1.0f, + int64_t batch_size = 4096); Review comment: There's actually nothing preventing us from doing the two batch-size approach today: ``` void OutputBatchCallback(ExecBatch batch) { for (const auto& small_batch : Partition(batch, small_batch_size)) { outputs_[0]->InputReceived(this, small_batch); } } ``` However: * The vector/shared_ptr allocations introduced by slicing will fight the benefits * The filter and project nodes right now are unfortunately creating a thread task for every batch which will sort of defeat the purpose. I agree this is best left for a follow-up. @sanjibansg is doing some experiments here over the next month or two as part of an internship project. ########## File path: cpp/src/arrow/compute/exec/tpch_node.h ########## @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> +#include <vector> +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/options.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/pcg_random.h" + +namespace arrow { +namespace compute { Review comment: I agree we want users to use this but I don't think that means we will need the node in a public namespace. Eventually, we will add function registry and Substrait integrations. At that point we will probably create some kind of "options" object which will be in the public API. At that point the nodes could even be in anonymous namespace, however I think the direct construction is necessary now and will probably still be useful in the future for unit testing. I think `arrow::compute::internal` is reasonable. ########## File path: cpp/src/arrow/compute/exec/tpch_node_test.cc ########## @@ -0,0 +1,623 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gmock/gmock-matchers.h> + +#include "arrow/api.h" +#include "arrow/array/validate.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/compute/exec/tpch_node.h" +#include "arrow/compute/exec/util.h" +#include "arrow/compute/kernels/row_encoder.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/pcg_random.h" +#include "arrow/util/thread_pool.h" + +#include <cctype> +#include <string> +#include <unordered_set> + +namespace arrow { +namespace compute { +static constexpr uint32_t kStartDate = + 8035; // January 1, 1992 is 8035 days after January 1, 1970 +static constexpr uint32_t kEndDate = + 10591; // December 12, 1998 is 10591 days after January 1, 1970 + +void ValidateBatch(const ExecBatch& batch) { + for (const Datum& d : batch.values) + ASSERT_OK(arrow::internal::ValidateArray(*d.array())); +} + +void VerifyUniqueKey(std::unordered_set<int32_t>& seen, const Datum& d, int32_t min, + int32_t max) { Review comment: This is a relatively recent introduction to the style guide (sadly it is not versioned) that we chose not to adopt: https://issues.apache.org/jira/browse/ARROW-14960 We really should add this fact to the C++ development guidelines. I will make a reminder to check it this evening. ########## File path: cpp/src/arrow/compute/exec/tpch_node.cc ########## @@ -0,0 +1,3431 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/tpch_node.h" +#include "arrow/util/future.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/unreachable.h" + +#include <algorithm> +#include <bitset> +#include <cstring> +#include <memory> +#include <mutex> +#include <queue> +#include <random> +#include <unordered_set> +#include <vector> + +namespace arrow { +using internal::checked_cast; + +namespace compute { +const char* NameParts[] = { + "almond", "antique", "aquamarine", "azure", "beige", "bisque", + "black", "blanched", "blue", "blush", "brown", "burlywood", + "burnished", "chartreuse", "chiffon", "chocolate", "coral", "cornflower", + "cornsilk", "cream", "cyan", "dark", "deep", "dim", + "dodger", "drab", "firebrick", "floral", "forest", "frosted", + "gainsboro", "ghost", "goldenrod", "green", "grey", "honeydew", + "hot", "indian", "ivory", "khaki", "lace", "lavender", + "lawn", "lemon", "light", "lime", "linen", "magenta", + "maroon", "medium", "metallic", "midnight", "mint", "misty", + "moccasin", "navajo", "navy", "olive", "orange", "orchid", + "pale", "papaya", "peach", "peru", "pink", "plum", + "powder", "puff", "purple", "red", "rose", "rosy", + "royal", "saddle", "salmon", "sandy", "seashell", "sienna", + "sky", "slate", "smoke", "snow", "spring", "steel", + "tan", "thistle", "tomato", "turquoise", "violet", "wheat", + "white", "yellow", +}; +static constexpr size_t kNumNameParts = sizeof(NameParts) / sizeof(NameParts[0]); Review comment: Does heap allocation matter for objects with static storage duration? (this is not a rhetorical question, I honestly don't know the answer :) ) ########## File path: cpp/src/arrow/compute/exec/tpch_node.cc ########## @@ -0,0 +1,3431 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/exec/tpch_node.h" +#include "arrow/util/future.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/unreachable.h" + +#include <algorithm> +#include <bitset> +#include <cstring> +#include <memory> +#include <mutex> +#include <queue> +#include <random> +#include <unordered_set> +#include <vector> + +namespace arrow { +using internal::checked_cast; + +namespace compute { +const char* NameParts[] = { + "almond", "antique", "aquamarine", "azure", "beige", "bisque", + "black", "blanched", "blue", "blush", "brown", "burlywood", + "burnished", "chartreuse", "chiffon", "chocolate", "coral", "cornflower", + "cornsilk", "cream", "cyan", "dark", "deep", "dim", + "dodger", "drab", "firebrick", "floral", "forest", "frosted", + "gainsboro", "ghost", "goldenrod", "green", "grey", "honeydew", + "hot", "indian", "ivory", "khaki", "lace", "lavender", + "lawn", "lemon", "light", "lime", "linen", "magenta", + "maroon", "medium", "metallic", "midnight", "mint", "misty", + "moccasin", "navajo", "navy", "olive", "orange", "orchid", + "pale", "papaya", "peach", "peru", "pink", "plum", + "powder", "puff", "purple", "red", "rose", "rosy", + "royal", "saddle", "salmon", "sandy", "seashell", "sienna", + "sky", "slate", "smoke", "snow", "spring", "steel", + "tan", "thistle", "tomato", "turquoise", "violet", "wheat", + "white", "yellow", +}; +static constexpr size_t kNumNameParts = sizeof(NameParts) / sizeof(NameParts[0]); + +const char* Types_1[] = { + "STANDARD ", "SMALL ", "MEDIUM ", "LARGE ", "ECONOMY ", "PROMO ", +}; +static constexpr size_t kNumTypes_1 = sizeof(Types_1) / sizeof(Types_1[0]); + +const char* Types_2[] = { + "ANODIZED ", "BURNISHED ", "PLATED ", "POLISHED ", "BRUSHED ", +}; +static constexpr size_t kNumTypes_2 = sizeof(Types_2) / sizeof(Types_2[0]); + +const char* Types_3[] = { + "TIN", "NICKEL", "BRASS", "STEEL", "COPPER", +}; +static constexpr size_t kNumTypes_3 = sizeof(Types_3) / sizeof(Types_3[0]); + +const char* Containers_1[] = { + "SM ", "LG ", "MD ", "JUMBO ", "WRAP ", +}; +static constexpr size_t kNumContainers_1 = sizeof(Containers_1) / sizeof(Containers_1[0]); + +const char* Containers_2[] = { + "CASE", "BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM", +}; +static constexpr size_t kNumContainers_2 = sizeof(Containers_2) / sizeof(Containers_2[0]); + +const char* Segments[] = { + "AUTOMOBILE", "BUILDING", "FURNITURE", "MACHINERY", "HOUSEHOLD", +}; +static constexpr size_t kNumSegments = sizeof(Segments) / sizeof(Segments[0]); + +const char* Priorities[] = { + "1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW", +}; +static constexpr size_t kNumPriorities = sizeof(Priorities) / sizeof(Priorities[0]); + +const char* Instructions[] = { + "DELIVER IN PERSON", + "COLLECT COD", + "NONE", + "TAKE BACK RETURN", +}; +static constexpr size_t kNumInstructions = sizeof(Instructions) / sizeof(Instructions[0]); + +const char* Modes[] = { + "REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB", +}; +static constexpr size_t kNumModes = sizeof(Modes) / sizeof(Modes[0]); + +const char* Nouns[] = { + "foxes ", "ideas ", "theodolites ", "pinto beans ", "instructions ", + "dependencies ", "excuses ", "platelets ", "asymptotes ", "courts ", + "dolphins ", "multipliers ", "sautemes ", "warthogs ", "frets ", + "dinos ", "attainments ", "somas ", "Tiresias '", "patterns ", + "forges ", "braids ", "hockey players ", "frays ", "warhorses ", + "dugouts ", "notomis ", "epitaphs ", "pearls ", "tithes ", + "waters ", "orbits ", "gifts ", "sheaves ", "depths ", + "sentiments ", "decoys ", "realms ", "pains ", "grouches ", + "escapades ", +}; +static constexpr size_t kNumNouns = sizeof(Nouns) / sizeof(Nouns[0]); + +const char* Verbs[] = { + "sleep ", "wake ", "are ", "cajole ", "haggle ", "nag ", "use ", + "boost ", "affix ", "detect ", "integrate ", "maintain ", "nod ", "was ", + "lose ", "sublate ", "solve ", "thrash ", "promise ", "engage ", "hinder ", + "print ", "x-ray ", "breach ", "eat ", "grow ", "impress ", "mold ", + "poach ", "serve ", "run ", "dazzle ", "snooze ", "doze ", "unwind ", + "kindle ", "play ", "hang ", "believe ", "doubt ", +}; +static constexpr size_t kNumVerbs = sizeof(Verbs) / sizeof(Verbs[0]); + +const char* Adjectives[] = { + "furious ", "sly ", "careful ", "blithe ", "quick ", "fluffy ", "slow ", + "quiet ", "ruthless ", "thin ", "close ", "dogged ", "daring ", "brave ", + "stealthy ", "permanent ", "enticing ", "idle ", "busy ", "regular ", "final ", + "ironic ", "even ", "bold ", "silent ", +}; +static constexpr size_t kNumAdjectives = sizeof(Adjectives) / sizeof(Adjectives[0]); + +const char* Adverbs[] = { + "sometimes ", "always ", "never ", "furiously ", "slyly ", "carefully ", + "blithely ", "quickly ", "fluffily ", "slowly ", "quietly ", "ruthlessly ", + "thinly ", "closely ", "doggedly ", "daringly ", "bravely ", "stealthily ", + "permanently ", "enticingly ", "idly ", "busily ", "regularly ", "finally ", + "ironically ", "evenly ", "boldly ", "silently ", +}; +static constexpr size_t kNumAdverbs = sizeof(Adverbs) / sizeof(Adverbs[0]); + +const char* Prepositions[] = { + "about ", "above ", "according to ", "across ", "after ", "against ", + "along ", "alongside of ", "among ", "around ", "at ", "atop ", + "before ", "behind ", "beneath ", "beside ", "besides ", "between ", + "beyond ", "beyond ", "by ", "despite ", "during ", "except ", + "for ", "from ", "in place of ", "inside ", "instead of ", "into ", + "near ", "of ", "on ", "outside ", "over ", "past ", + "since ", "through ", "throughout ", "to ", "toward ", "under ", + "until ", "up ", "upon ", "without ", "with ", "within ", +}; +static constexpr size_t kNumPrepositions = sizeof(Prepositions) / sizeof(Prepositions[0]); + +const char* Auxiliaries[] = { + "do ", + "may ", + "might ", + "shall ", + "will ", + "would ", + "can ", + "could ", + "should ", + "ought to ", + "must ", + "will have to ", + "shall have to ", + "could have to ", + "should have to ", + "must have to ", + "need to ", + "try to ", +}; +static constexpr size_t kNumAuxiliaries = sizeof(Auxiliaries) / sizeof(Auxiliaries[0]); + +const char* Terminators[] = { + ".", ";", ":", "?", "!", "--", +}; +static constexpr size_t kNumTerminators = sizeof(Terminators) / sizeof(Terminators[0]); + +// The spec says to generate a 300 MB string according to a grammar. This is a +// concurrent implementation of the generator. Each thread generates the text in +// (up to) 8KB chunks of text. The generator maintains a cursor into the +// 300 MB buffer. After generating the chunk, the cursor is incremented +// to reserve space, and the chunk is memcpy-d in. +// This text is used to generate the COMMENT columns. To generate a comment, the spec +// says to pick a random length and a random offset into the 300 MB buffer (it does +// not specify it should be word/sentence aligned), and that slice of text becomes +// the comment. +class TpchPseudotext { + public: + Status EnsureInitialized(random::pcg32_fast& rng); + Result<Datum> GenerateComments(size_t num_comments, size_t min_length, + size_t max_length, random::pcg32_fast& rng); + + private: + bool GenerateWord(int64_t& offset, random::pcg32_fast& rng, char* arr, + const char** words, size_t num_choices); + bool GenerateNoun(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateVerb(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateAdjective(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateAdverb(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GeneratePreposition(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateAuxiliary(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateTerminator(int64_t& offset, random::pcg32_fast& rng, char* arr); + + bool GenerateNounPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GenerateVerbPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr); + bool GeneratePrepositionalPhrase(int64_t& offset, random::pcg32_fast& rng, char* arr); + + bool GenerateSentence(int64_t& offset, random::pcg32_fast& rng, char* arr); + + std::atomic<bool> done_ = {false}; + int64_t generated_offset_{0}; + std::mutex text_guard_; + std::unique_ptr<Buffer> text_; + static constexpr int64_t kChunkSize = 8192; + static constexpr int64_t kTextBytes = 300 * 1024 * 1024; // 300 MB +}; + +static TpchPseudotext g_text; + +Status TpchPseudotext::EnsureInitialized(random::pcg32_fast& rng) { + if (done_.load()) return Status::OK(); + + { + std::lock_guard<std::mutex> lock(text_guard_); + if (!text_) { + ARROW_ASSIGN_OR_RAISE(text_, AllocateBuffer(kTextBytes)); + } + } + char* out = reinterpret_cast<char*>(text_->mutable_data()); + char temp_buff[kChunkSize]; + + while (!done_.load()) { + int64_t known_valid_offset = 0; + int64_t try_offset = 0; + while (GenerateSentence(try_offset, rng, temp_buff)) known_valid_offset = try_offset; + + bool last_one; + int64_t offset; + int64_t memcpy_size; + { + std::lock_guard<std::mutex> lock(text_guard_); + if (done_.load()) return Status::OK(); + int64_t bytes_remaining = kTextBytes - generated_offset_; + memcpy_size = std::min(known_valid_offset, bytes_remaining); + offset = generated_offset_; + generated_offset_ += memcpy_size; + last_one = generated_offset_ == kTextBytes; + } + std::memcpy(out + offset, temp_buff, memcpy_size); + if (last_one) done_.store(true); + } + return Status::OK(); +} + +Result<Datum> TpchPseudotext::GenerateComments(size_t num_comments, size_t min_length, + size_t max_length, + random::pcg32_fast& rng) { + RETURN_NOT_OK(EnsureInitialized(rng)); + std::uniform_int_distribution<size_t> length_dist(min_length, max_length); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buffer, + AllocateBuffer(sizeof(int32_t) * (num_comments + 1))); + int32_t* offsets = reinterpret_cast<int32_t*>(offset_buffer->mutable_data()); + offsets[0] = 0; + for (size_t i = 1; i <= num_comments; i++) + offsets[i] = offsets[i - 1] + static_cast<int32_t>(length_dist(rng)); + + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> comment_buffer, + AllocateBuffer(offsets[num_comments])); + char* comments = reinterpret_cast<char*>(comment_buffer->mutable_data()); + for (size_t i = 0; i < num_comments; i++) { + size_t length = offsets[i + 1] - offsets[i]; + std::uniform_int_distribution<size_t> offset_dist(0, kTextBytes - length); + size_t offset_in_text = offset_dist(rng); + std::memcpy(comments + offsets[i], text_->data() + offset_in_text, length); + } + ArrayData ad(utf8(), num_comments, + {nullptr, std::move(offset_buffer), std::move(comment_buffer)}); + return std::move(ad); +} + +bool TpchPseudotext::GenerateWord(int64_t& offset, random::pcg32_fast& rng, char* arr, + const char** words, size_t num_choices) { + std::uniform_int_distribution<size_t> dist(0, num_choices - 1); + const char* word = words[dist(rng)]; + size_t length = std::strlen(word); + if (offset + length > kChunkSize) return false; + std::memcpy(arr + offset, word, length); + offset += length; + return true; +} + +bool TpchPseudotext::GenerateNoun(int64_t& offset, random::pcg32_fast& rng, char* arr) { + return GenerateWord(offset, rng, arr, Nouns, kNumNouns); +} + +bool TpchPseudotext::GenerateVerb(int64_t& offset, random::pcg32_fast& rng, char* arr) { + return GenerateWord(offset, rng, arr, Verbs, kNumVerbs); +} + +bool TpchPseudotext::GenerateAdjective(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + return GenerateWord(offset, rng, arr, Adjectives, kNumAdjectives); +} + +bool TpchPseudotext::GenerateAdverb(int64_t& offset, random::pcg32_fast& rng, char* arr) { + return GenerateWord(offset, rng, arr, Adverbs, kNumAdverbs); +} + +bool TpchPseudotext::GeneratePreposition(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + return GenerateWord(offset, rng, arr, Prepositions, kNumPrepositions); +} + +bool TpchPseudotext::GenerateAuxiliary(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + return GenerateWord(offset, rng, arr, Auxiliaries, kNumAuxiliaries); +} + +bool TpchPseudotext::GenerateTerminator(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + bool result = GenerateWord(offset, rng, arr, Terminators, kNumTerminators); + // Swap the space with the terminator + if (result) std::swap(*(arr + offset - 2), *(arr + offset - 1)); + return result; +} + +bool TpchPseudotext::GenerateNounPhrase(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + std::uniform_int_distribution<size_t> dist(0, 3); + const char* comma_space = ", "; + bool success = true; + switch (dist(rng)) { + case 0: + success &= GenerateNoun(offset, rng, arr); + break; + case 1: + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateNoun(offset, rng, arr); + break; + case 2: + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateWord(--offset, rng, arr, &comma_space, 1); + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateNoun(offset, rng, arr); + break; + case 3: + success &= GenerateAdverb(offset, rng, arr); + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateNoun(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 3 inclusive"); + break; + } + return success; +} + +bool TpchPseudotext::GenerateVerbPhrase(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + std::uniform_int_distribution<size_t> dist(0, 3); + bool success = true; + switch (dist(rng)) { + case 0: + success &= GenerateVerb(offset, rng, arr); + break; + case 1: + success &= GenerateAuxiliary(offset, rng, arr); + success &= GenerateVerb(offset, rng, arr); + break; + case 2: + success &= GenerateVerb(offset, rng, arr); + success &= GenerateAdverb(offset, rng, arr); + break; + case 3: + success &= GenerateAuxiliary(offset, rng, arr); + success &= GenerateVerb(offset, rng, arr); + success &= GenerateAdverb(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 3 inclusive"); + break; + } + return success; +} + +bool TpchPseudotext::GeneratePrepositionalPhrase(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + const char* the_space = "the "; + bool success = true; + success &= GeneratePreposition(offset, rng, arr); + success &= GenerateWord(offset, rng, arr, &the_space, 1); + success &= GenerateNounPhrase(offset, rng, arr); + return success; +} + +bool TpchPseudotext::GenerateSentence(int64_t& offset, random::pcg32_fast& rng, + char* arr) { + std::uniform_int_distribution<size_t> dist(0, 4); + bool success = true; + switch (dist(rng)) { + case 0: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 1: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 2: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 3: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 4: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 5 inclusive"); + break; + } + return success; +} + +class TpchTableGenerator { + public: + using OutputBatchCallback = std::function<void(ExecBatch)>; + using FinishedCallback = std::function<void(int64_t)>; + using GenerateFn = std::function<Status(size_t)>; + using ScheduleCallback = std::function<Status(GenerateFn)>; + using AbortCallback = std::function<void()>; + + virtual Status Init(std::vector<std::string> columns, float scale_factor, + int64_t batch_size) = 0; + + virtual Status StartProducing(size_t num_threads, OutputBatchCallback output_callback, + FinishedCallback finished_callback, + ScheduleCallback schedule_callback) = 0; + + bool Abort() { + bool expected = false; + return done_.compare_exchange_strong(expected, true); + } + + virtual std::shared_ptr<Schema> schema() const = 0; + + virtual ~TpchTableGenerator() = default; + + protected: + std::atomic<bool> done_ = {false}; + std::atomic<int64_t> batches_outputted_ = {0}; +}; + +int GetNumDigits(int64_t x) { + // This if statement chain is for MAXIMUM SPEED + // Source: + // https://stackoverflow.com/questions/1068849/how-do-i-determine-the-number-of-digits-of-an-integer-in-c + ARROW_DCHECK(x >= 0); + if (x < 10ll) return 1; + if (x < 100ll) return 2; + if (x < 1000ll) return 3; + if (x < 10000ll) return 4; + if (x < 100000ll) return 5; + if (x < 1000000ll) return 6; + if (x < 10000000ll) return 7; + if (x < 100000000ll) return 8; + if (x < 1000000000ll) return 9; + if (x < 10000000000ll) return 10; + if (x < 100000000000ll) return 11; + if (x < 1000000000000ll) return 12; + if (x < 10000000000000ll) return 13; + if (x < 100000000000000ll) return 14; + if (x < 1000000000000000ll) return 15; + if (x < 10000000000000000ll) return 16; + if (x < 100000000000000000ll) return 17; + if (x < 1000000000000000000ll) return 18; + return -1; +} + +void AppendNumberPaddedToNineDigits(char* out, int64_t x) { + // We do all of this to avoid calling snprintf, which needs to handle locale, + // which can be slow, especially on Mac and Windows. + int num_digits = GetNumDigits(x); + int num_padding_zeros = std::max(9 - num_digits, 0); + std::memset(out, '0', static_cast<size_t>(num_padding_zeros)); + while (x > 0) { + *(out + num_padding_zeros + num_digits - 1) = ('0' + x % 10); + num_digits -= 1; + x /= 10; + } +} + +Result<std::shared_ptr<Schema>> SetOutputColumns( + const std::vector<std::string>& columns, + const std::vector<std::shared_ptr<DataType>>& types, + const std::unordered_map<std::string, int>& name_map, std::vector<int>& gen_list) { + gen_list.clear(); + std::vector<std::shared_ptr<Field>> fields; + if (columns.empty()) { + fields.resize(name_map.size()); + gen_list.resize(name_map.size()); + for (auto pair : name_map) { + int col_idx = pair.second; + fields[col_idx] = field(pair.first, types[col_idx]); + gen_list[col_idx] = col_idx; + } + return schema(std::move(fields)); + } else { + for (const std::string& col : columns) { + auto entry = name_map.find(col); + if (entry == name_map.end()) return Status::Invalid("Not a valid column name"); + int col_idx = static_cast<int>(entry->second); + fields.push_back(field(col, types[col_idx])); + gen_list.push_back(col_idx); + } + return schema(std::move(fields)); + } +} + +Result<Datum> RandomVString(random::pcg32_fast& rng, int64_t num_rows, int32_t min_length, + int32_t max_length) { + std::uniform_int_distribution<int32_t> length_dist(min_length, max_length); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff, + AllocateBuffer((num_rows + 1) * sizeof(int32_t))); + int32_t* offsets = reinterpret_cast<int32_t*>(offset_buff->mutable_data()); + offsets[0] = 0; + for (int64_t i = 1; i <= num_rows; i++) offsets[i] = offsets[i - 1] + length_dist(rng); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> str_buff, + AllocateBuffer(offsets[num_rows])); + char* str = reinterpret_cast<char*>(str_buff->mutable_data()); + + // Spec says to pick random alphanumeric characters from a set of at least + // 64 symbols. Now, let's think critically here: 26 letters in the alphabet, + // so 52 total for upper and lower case, and 10 possible digits gives 62 + // characters... + // dbgen solves this by including a space and a comma as well, so we'll + // copy that. + const char alpha_numerics[65] = + "0123456789abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ,"; + std::uniform_int_distribution<int> char_dist(0, 63); + for (int32_t i = 0; i < offsets[num_rows]; i++) str[i] = alpha_numerics[char_dist(rng)]; + + ArrayData ad(utf8(), num_rows, {nullptr, std::move(offset_buff), std::move(str_buff)}); + return std::move(ad); +} + +void AppendNumber(char*& out, int num_digits, int32_t x) { + out += (num_digits - 1); + while (x > 0) { + *out-- = '0' + (x % 10); + x /= 10; + } + out += (num_digits + 1); +} + +void GeneratePhoneNumber(char* out, random::pcg32_fast& rng, int32_t country) { + std::uniform_int_distribution<int32_t> three_digit(100, 999); + std::uniform_int_distribution<int32_t> four_digit(1000, 9999); + + int32_t country_code = country + 10; + int32_t l1 = three_digit(rng); + int32_t l2 = three_digit(rng); + int32_t l3 = four_digit(rng); + AppendNumber(out, 2, country_code); + *out++ = '-'; + AppendNumber(out, 3, l1); + *out++ = '-'; + AppendNumber(out, 3, l2); + *out++ = '-'; + AppendNumber(out, 4, l3); +} + +static constexpr uint32_t kStartDate = + 8035; // January 1, 1992 is 8035 days after January 1, 1970 +static constexpr uint32_t kCurrentDate = + 9298; // June 17, 1995 is 9298 days after January 1, 1970 +static constexpr uint32_t kEndDate = + 10591; // December 12, 1998 is 10591 days after January 1, 1970 + +using GenerateColumnFn = std::function<Status(size_t)>; +class PartAndPartSupplierGenerator { + public: + Status Init(size_t num_threads, int64_t batch_size, float scale_factor) { + if (!inited_) { + inited_ = true; + batch_size_ = batch_size; + scale_factor_ = scale_factor; + + arrow_vendored::pcg_extras::seed_seq_from<std::random_device> seq; Review comment: Agreed. We can take a seed in the constructor. If not specified I think defaulting to non-deterministic is ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
