save-buffer commented on a change in pull request #12537: URL: https://github.com/apache/arrow/pull/12537#discussion_r825009715
########## File path: cpp/src/arrow/compute/exec/tpch_node.cc ########## @@ -0,0 +1,3836 @@ +#include "arrow/compute/exec/tpch_node.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/future.h" +#include "arrow/util/unreachable.h" + +#include <algorithm> +#include <bitset> +#include <cstring> +#include <random> +#include <vector> +#include <memory> +#include <mutex> +#include <queue> +#include <unordered_set> + +namespace arrow +{ + using internal::checked_cast; + + namespace compute + { + class TpchText + { + public: + Status InitIfNeeded(random::pcg32_fast &rng); + Result<Datum> GenerateComments( + size_t num_comments, + size_t min_length, + size_t max_length, + random::pcg32_fast &rng); + + private: + bool GenerateWord(int64_t &offset, random::pcg32_fast &rng, char *arr, const char **words, size_t num_choices); + bool GenerateNoun(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateVerb(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateAdjective(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateAdverb(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GeneratePreposition(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateAuxiliary(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateTerminator(int64_t &offset, random::pcg32_fast &rng, char *arr); + + bool GenerateNounPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GenerateVerbPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr); + bool GeneratePrepositionalPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr); + + bool GenerateSentence(int64_t &offset, random::pcg32_fast &rng, char *arr); + + std::atomic<bool> done_ = { false }; + int64_t generated_offset_ = 0; + std::mutex text_guard_; + std::unique_ptr<Buffer> text_; + static constexpr int64_t kChunkSize = 8192; + static constexpr int64_t kTextBytes = 300 * 1024 * 1024; // 300 MB + }; + + class TpchTableGenerator + { + public: + using OutputBatchCallback = std::function<void(ExecBatch)>; + using FinishedCallback = std::function<void(int64_t)>; + using GenerateFn = std::function<Status(size_t)>; + using ScheduleCallback = std::function<Status(GenerateFn)>; + using AbortCallback = std::function<void()>; + + virtual Status Init( + std::vector<std::string> columns, + float scale_factor, + int64_t batch_size) = 0; + + virtual Status StartProducing( + size_t num_threads, + OutputBatchCallback output_callback, + FinishedCallback finished_callback, + ScheduleCallback schedule_callback) = 0; + + void Abort(AbortCallback abort_callback) + { + bool expected = false; + if(done_.compare_exchange_strong(expected, true)) + { + abort_callback(); + } + } + + virtual std::shared_ptr<Schema> schema() const = 0; + + virtual ~TpchTableGenerator() = default; + + protected: + std::atomic<bool> done_ = { false }; + std::atomic<int64_t> batches_outputted_ = { 0 }; + }; + + int GetNumDigits(int64_t x) + { + // This if statement chain is for MAXIMUM SPEED + /* + ., + . _,'f----.._ + |\ ,-'"/ | ,' + |,_ ,--. / + /,-. ,'`. (_ + f o| o|__ "`-. + ,-._.,--'_ `. _.,-` + `"' ___.,'` j,-' + `-.__.,--' + */ + // Source: https://stackoverflow.com/questions/1068849/how-do-i-determine-the-number-of-digits-of-an-integer-in-c + ARROW_DCHECK(x >= 0); + if(x < 10ll) return 1; + if(x < 100ll) return 2; + if(x < 1000ll) return 3; + if(x < 10000ll) return 4; + if(x < 100000ll) return 5; + if(x < 1000000ll) return 6; + if(x < 10000000ll) return 7; + if(x < 100000000ll) return 8; + if(x < 1000000000ll) return 9; + if(x < 10000000000ll) return 10; + if(x < 100000000000ll) return 11; + if(x < 1000000000000ll) return 12; + if(x < 10000000000000ll) return 13; + if(x < 100000000000000ll) return 14; + if(x < 1000000000000000ll) return 15; + if(x < 10000000000000000ll) return 16; + if(x < 100000000000000000ll) return 17; + if(x < 1000000000000000000ll) return 18; + return -1; + } + + void AppendNumberPaddedToNineDigits(char *out, int64_t x) + { + // We do all of this to avoid calling snprintf, which does a lot of crazy + // locale stuff. On Windows and MacOS this can get suuuuper slow + int num_digits = GetNumDigits(x); + int num_padding_zeros = std::max(9 - num_digits, 0); + std::memset(out, '0', static_cast<size_t>(num_padding_zeros)); + while(x > 0) + { + *(out + num_padding_zeros + num_digits - 1) = ('0' + x % 10); + num_digits -= 1; + x /= 10; + } + } + + Result<std::shared_ptr<Schema>> SetOutputColumns( + const std::vector<std::string> &columns, + const std::vector<std::shared_ptr<DataType>> &types, + const std::unordered_map<std::string, int> &name_map, + std::vector<int> &gen_list) + { + gen_list.clear(); + std::vector<std::shared_ptr<Field>> fields; + if(columns.empty()) + { + fields.resize(name_map.size()); + gen_list.resize(name_map.size()); + for(auto pair : name_map) + { + int col_idx = pair.second; + fields[col_idx] = field(pair.first, types[col_idx]); + gen_list[col_idx] = col_idx; + } + return schema(std::move(fields)); + } + else + { + for(const std::string &col : columns) + { + auto entry = name_map.find(col); + if(entry == name_map.end()) + return Status::Invalid("Not a valid column name"); + int col_idx = static_cast<int>(entry->second); + fields.push_back(field(col, types[col_idx])); + gen_list.push_back(col_idx); + } + return schema(std::move(fields)); + } + } + + static TpchText g_text; + + Status TpchText::InitIfNeeded(random::pcg32_fast &rng) + { + if(done_.load()) + return Status::OK(); + + { + std::lock_guard<std::mutex> lock(text_guard_); + if(!text_) + { + ARROW_ASSIGN_OR_RAISE(text_, AllocateBuffer(kTextBytes)); + } + } + char *out = reinterpret_cast<char *>(text_->mutable_data()); + char temp_buff[kChunkSize]; + while(done_.load() == false) + { + int64_t known_valid_offset = 0; + int64_t try_offset = 0; + while(GenerateSentence(try_offset, rng, temp_buff)) + known_valid_offset = try_offset; + + { + std::lock_guard<std::mutex> lock(text_guard_); + if(done_.load()) + return Status::OK(); + int64_t bytes_remaining = kTextBytes - generated_offset_; + int64_t memcpy_size = std::min(known_valid_offset, bytes_remaining); + std::memcpy(out + generated_offset_, temp_buff, memcpy_size); + generated_offset_ += memcpy_size; + if(generated_offset_ == kTextBytes) + done_.store(true); + } + } + return Status::OK(); + } + + Result<Datum> TpchText::GenerateComments( + size_t num_comments, + size_t min_length, + size_t max_length, + random::pcg32_fast &rng) + { + RETURN_NOT_OK(InitIfNeeded(rng)); + std::uniform_int_distribution<size_t> length_dist(min_length, max_length); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buffer, AllocateBuffer(sizeof(int32_t) * (num_comments + 1))); + int32_t *offsets = reinterpret_cast<int32_t *>(offset_buffer->mutable_data()); + offsets[0] = 0; + for(size_t i = 1; i <= num_comments; i++) + offsets[i] = offsets[i - 1] + length_dist(rng); + + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> comment_buffer, AllocateBuffer(offsets[num_comments])); + char *comments = reinterpret_cast<char *>(comment_buffer->mutable_data()); + for(size_t i = 0; i < num_comments; i++) + { + size_t length = offsets[i + 1] - offsets[i]; + std::uniform_int_distribution<size_t> offset_dist(0, kTextBytes - length); + size_t offset_in_text = offset_dist(rng); + std::memcpy(comments + offsets[i], text_->data() + offset_in_text, length); + } + ArrayData ad(utf8(), num_comments, { nullptr, std::move(offset_buffer), std::move(comment_buffer) }); + return std::move(ad); + } + + Result<Datum> RandomVString( + random::pcg32_fast &rng, + int64_t num_rows, + int32_t min_length, + int32_t max_length) + { + std::uniform_int_distribution<int32_t> length_dist(min_length, max_length); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> offset_buff, AllocateBuffer((num_rows + 1) * sizeof(int32_t))); + int32_t *offsets = reinterpret_cast<int32_t *>(offset_buff->mutable_data()); + offsets[0] = 0; + for(int64_t i = 1; i <= num_rows; i++) + offsets[i] = offsets[i - 1] + length_dist(rng); + ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> str_buff, AllocateBuffer(offsets[num_rows])); + char *str = reinterpret_cast<char *>(str_buff->mutable_data()); + + // Spec says to pick random alphanumeric characters from a set of at least + // 64 symbols. Now, let's think critically here: 26 letters in the alphabet, + // so 52 total for upper and lower case, and 10 possible digits gives 62 + // characters... + // dbgen solves this by including a space and a comma as well, so we'll + // copy that. + const char alpha_numerics[65] = + "0123456789abcdefghijklmnopqrstuvwxyz ABCDEFGHIJKLMNOPQRSTUVWXYZ,"; + std::uniform_int_distribution<int> char_dist(0, 63); + for(int32_t i = 0; i < offsets[num_rows]; i++) + str[i] = alpha_numerics[char_dist(rng)]; + + ArrayData ad(utf8(), num_rows, { nullptr, std::move(offset_buff), std::move(str_buff) }); + return std::move(ad); + } + + void AppendNumber(char *&out, int num_digits, int32_t x) + { + out += (num_digits - 1); + while(x > 0) + { + *out-- = '0' + (x % 10); + x /= 10; + } + out += (num_digits + 1); + } + + void GeneratePhoneNumber( + char *out, + random::pcg32_fast &rng, + int32_t country) + { + std::uniform_int_distribution<int32_t> three_digit(100, 999); + std::uniform_int_distribution<int32_t> four_digit(1000, 9999); + + int32_t country_code = country + 10; + int32_t l1 = three_digit(rng); + int32_t l2 = three_digit(rng); + int32_t l3 = four_digit(rng); + AppendNumber(out, 2, country_code); + *out++ = '-'; + AppendNumber(out, 3, l1); + *out++ = '-'; + AppendNumber(out, 3, l2); + *out++ = '-'; + AppendNumber(out, 4, l3); + } + + static constexpr uint32_t STARTDATE = 8035; // January 1, 1992 is 8035 days after January 1, 1970 + static constexpr uint32_t CURRENTDATE = 9298; // June 17, 1995 is 9298 days after January 1, 1970 + static constexpr uint32_t ENDDATE = 10591; // December 12, 1998 is 10591 days after January 1, 1970 + + const char *NameParts[] = + { + "almond", "antique", "aquamarine", "azure", "beige", "bisque", "black", "blanched", "blue", + "blush", "brown", "burlywood", "burnished", "chartreuse", "chiffon", "chocolate", "coral", + "cornflower", "cornsilk", "cream", "cyan", "dark", "deep", "dim", "dodger", "drab", "firebrick", + "floral", "forest", "frosted", "gainsboro", "ghost", "goldenrod", "green", "grey", "honeydew", + "hot", "indian", "ivory", "khaki", "lace", "lavender", "lawn", "lemon", "light", "lime", "linen", + "magenta", "maroon", "medium", "metallic", "midnight", "mint", "misty", "moccasin", "navajo", + "navy", "olive", "orange", "orchid", "pale", "papaya", "peach", "peru", "pink", "plum", "powder", + "puff", "purple", "red", "rose", "rosy", "royal", "saddle", "salmon", "sandy", "seashell", "sienna", + "sky", "slate", "smoke", "snow", "spring", "steel", "tan", "thistle", "tomato", "turquoise", "violet", + "wheat", "white", "yellow", + }; + static constexpr size_t kNumNameParts = sizeof(NameParts) / sizeof(NameParts[0]); + + const char *Types_1[] = + { + "STANDARD ", "SMALL ", "MEDIUM ", "LARGE ", "ECONOMY ", "PROMO ", + }; + static constexpr size_t kNumTypes_1 = sizeof(Types_1) / sizeof(Types_1[0]); + + const char *Types_2[] = + { + "ANODIZED ", "BURNISHED ", "PLATED ", "POLISHED ", "BRUSHED ", + }; + static constexpr size_t kNumTypes_2 = sizeof(Types_2) / sizeof(Types_2[0]); + + const char *Types_3[] = + { + "TIN", "NICKEL", "BRASS", "STEEL", "COPPER", + }; + static constexpr size_t kNumTypes_3 = sizeof(Types_3) / sizeof(Types_3[0]); + + const char *Containers_1[] = + { + "SM ", "LG ", "MD ", "JUMBO ", "WRAP ", + }; + static constexpr size_t kNumContainers_1 = sizeof(Containers_1) / sizeof(Containers_1[0]); + + const char *Containers_2[] = + { + "CASE", "BOX", "BAG", "JAR", "PKG", "PACK", "CAN", "DRUM", + }; + static constexpr size_t kNumContainers_2 = sizeof(Containers_2) / sizeof(Containers_2[0]); + + const char *Segments[] = + { + "AUTOMOBILE", "BUILDING", "FURNITURE", "MACHINERY", "HOUSEHOLD", + }; + static constexpr size_t kNumSegments = sizeof(Segments) / sizeof(Segments[0]); + + const char *Priorities[] = + { + "1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW", + }; + static constexpr size_t kNumPriorities = sizeof(Priorities) / sizeof(Priorities[0]); + + const char *Instructions[] = + { + "DELIVER IN PERSON", "COLLECT COD", "NONE", "TAKE BACK RETURN", + }; + static constexpr size_t kNumInstructions = sizeof(Instructions) / sizeof(Instructions[0]); + + const char *Modes[] = + { + "REG AIR", "AIR", "RAIL", "SHIP", "TRUCK", "MAIL", "FOB", + }; + static constexpr size_t kNumModes = sizeof(Modes) / sizeof(Modes[0]); + + const char *Nouns[] = + { + "foxes ", "ideas ", "theodolites ", "pinto beans ", "instructions ", "dependencies ", "excuses ", + "platelets ", "asymptotes ", "courts ", "dolphins ", "multipliers ", "sautemes ", "warthogs ", "frets ", + "dinos ", "attainments ", "somas ", "Tiresias '", "patterns ", "forges ", "braids ", "hockey players ", "frays ", + "warhorses ", "dugouts ", "notomis ", "epitaphs ", "pearls ", "tithes ", "waters ", "orbits ", "gifts ", "sheaves ", + "depths ", "sentiments ", "decoys ", "realms ", "pains ", "grouches ", "escapades ", + }; + static constexpr size_t kNumNouns = sizeof(Nouns) / sizeof(Nouns[0]); + + const char *Verbs[] = + { + "sleep ", "wake ", "are ", "cajole ", "haggle ", "nag ", "use ", "boost ", "affix ", "detect ", "integrate ", + "maintain ", "nod ", "was ", "lose ", "sublate ", "solve ", "thrash ", "promise ", "engage ", "hinder ", + "print ", "x-ray ", "breach ", "eat ", "grow ", "impress ", "mold ", "poach ", "serve ", "run ", "dazzle ", + "snooze ", "doze ", "unwind ", "kindle ", "play ", "hang ", "believe ", "doubt ", + }; + static constexpr size_t kNumVerbs = sizeof(Verbs) / sizeof(Verbs[0]); + + const char *Adjectives[] = + { + "furious ", "sly ", "careful ", "blithe ", "quick ", "fluffy ", "slow ", "quiet ", "ruthless ", "thin ", + "close ", "dogged ", "daring ", "brave ", "stealthy ", "permanent ", "enticing ", "idle ", "busy ", + "regular ", "final ", "ironic ", "even ", "bold ", "silent ", + }; + static constexpr size_t kNumAdjectives = sizeof(Adjectives) / sizeof(Adjectives[0]); + + const char *Adverbs[] = + { + "sometimes ", "always ", "never ", "furiously ", "slyly ", "carefully ", "blithely ", "quickly ", "fluffily ", + "slowly ", "quietly ", "ruthlessly ", "thinly ", "closely ", "doggedly ", "daringly ", "bravely ", "stealthily ", + "permanently ", "enticingly ", "idly ", "busily ", "regularly ", "finally ", "ironically ", "evenly ", "boldly ", + "silently ", + }; + static constexpr size_t kNumAdverbs = sizeof(Adverbs) / sizeof(Adverbs[0]); + + const char *Prepositions[] = + { + "about ", "above ", "according to ", "across ", "after ", "against ", "along ", "alongside of ", "among ", + "around ", "at ", "atop ", "before ", "behind ", "beneath ", "beside ", "besides ", "between ", "beyond ", + "beyond ", "by ", "despite ", "during ", "except ", "for ", "from ", "in place of ", "inside ", "instead of ", + "into ", "near ", "of ", "on ", "outside ", "over ", "past ", "since ", "through ", "throughout ", "to ", + "toward ", "under ", "until ", "up ", "upon ", "without ", "with ", "within ", + }; + static constexpr size_t kNumPrepositions = sizeof(Prepositions) / sizeof(Prepositions[0]); + + const char *Auxiliaries[] = + { + "do ", "may ", "might ", "shall ", "will ", "would ", "can ", "could ", "should ", "ought to ", "must ", + "will have to ", "shall have to ", "could have to ", "should have to ", "must have to ", "need to ", "try to ", + }; + static constexpr size_t kNumAuxiliaries = sizeof(Auxiliaries) / sizeof(Auxiliaries[0]); + + const char *Terminators[] = + { + ".", ";", ":", "?", "!", "--", + }; + static constexpr size_t kNumTerminators = sizeof(Terminators) / sizeof(Terminators[0]); + + bool TpchText::GenerateWord(int64_t &offset, random::pcg32_fast &rng, char *arr, const char **words, size_t num_choices) + { + std::uniform_int_distribution<size_t> dist(0, num_choices - 1); + const char *word = words[dist(rng)]; + size_t length = std::strlen(word); + if(offset + length > kChunkSize) + return false; + std::memcpy(arr + offset, word, length); + offset += length; + return true; + } + + bool TpchText::GenerateNoun(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Nouns, kNumNouns); + } + + bool TpchText::GenerateVerb(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Verbs, kNumVerbs); + } + + bool TpchText::GenerateAdjective(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Adjectives, kNumAdjectives); + } + + bool TpchText::GenerateAdverb(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Adverbs, kNumAdverbs); + } + + bool TpchText::GeneratePreposition(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Prepositions, kNumPrepositions); + } + + bool TpchText::GenerateAuxiliary(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + return GenerateWord(offset, rng, arr, Auxiliaries, kNumAuxiliaries); + } + + bool TpchText::GenerateTerminator(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + bool result = GenerateWord(offset, rng, arr, Terminators, kNumTerminators); + // Swap the space with the terminator + if(result) + std::swap(*(arr + offset - 2), *(arr + offset - 1)); + return result; + } + + bool TpchText::GenerateNounPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + std::uniform_int_distribution<size_t> dist(0, 3); + const char *comma_space = ", "; + bool success = true; + switch(dist(rng)) + { + case 0: + success &= GenerateNoun(offset, rng, arr); + break; + case 1: + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateNoun(offset, rng, arr); + break; + case 2: + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateWord(--offset, rng, arr, &comma_space, 1); + success &= GenerateAdjective(offset, rng, arr); + success &= GenerateNoun(offset, rng, arr); + break; + case 3: + GenerateAdverb(offset, rng, arr); + GenerateAdjective(offset, rng, arr); + GenerateNoun(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 3 inclusive"); + break; + } + return success; + } + + bool TpchText::GenerateVerbPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + std::uniform_int_distribution<size_t> dist(0, 3); + bool success = true; + switch(dist(rng)) + { + case 0: + success &= GenerateVerb(offset, rng, arr); + break; + case 1: + success &= GenerateAuxiliary(offset, rng, arr); + success &= GenerateVerb(offset, rng, arr); + break; + case 2: + success &= GenerateVerb(offset, rng, arr); + success &= GenerateAdverb(offset, rng, arr); + break; + case 3: + success &= GenerateAuxiliary(offset, rng, arr); + success &= GenerateVerb(offset, rng, arr); + success &= GenerateAdverb(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 3 inclusive"); + break; + } + return success; + } + + bool TpchText::GeneratePrepositionalPhrase(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + const char *the_space = "the "; + bool success = true; + success &= GeneratePreposition(offset, rng, arr); + success &= GenerateWord(offset, rng, arr, &the_space, 1); + success &= GenerateNounPhrase(offset, rng, arr); + return success; + } + + bool TpchText::GenerateSentence(int64_t &offset, random::pcg32_fast &rng, char *arr) + { + std::uniform_int_distribution<size_t> dist(0, 4); + bool success = true; + switch(dist(rng)) + { + case 0: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 1: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 2: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 3: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 4: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GenerateNounPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + case 5: + success &= GenerateNounPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateVerbPhrase(offset, rng, arr); + success &= GeneratePrepositionalPhrase(offset, rng, arr); + success &= GenerateTerminator(offset, rng, arr); + break; + default: + Unreachable("Random number should be between 0 and 5 inclusive"); + break; + } + return success; + } + + using GenerateColumnFn = std::function<Status(size_t)>; + class PartAndPartSupplierGenerator + { + public: + Status Init( + size_t num_threads, + int64_t batch_size, + float scale_factor) + { + if(!inited_) + { + inited_ = true; + batch_size_ = batch_size; + scale_factor_ = scale_factor; + + thread_local_data_.resize(num_threads); + for(ThreadLocalData &tld : thread_local_data_) + { + // 5 is the maximum number of different strings we need to concatenate + tld.string_indices.resize(5 * batch_size_); + } + part_rows_to_generate_ = static_cast<int64_t>(scale_factor_ * 200000); + } + return Status::OK(); + } + + int64_t part_batches_generated() const + { + return part_batches_generated_.load(); + } + + int64_t partsupp_batches_generated() const + { + return partsupp_batches_generated_.load(); + } + + Result<std::shared_ptr<Schema>> SetPartOutputColumns(const std::vector<std::string> &cols) + { + return SetOutputColumns(cols, part_types_, part_name_map_, part_cols_); + } + + Result<std::shared_ptr<Schema>> SetPartSuppOutputColumns(const std::vector<std::string> &cols) + { + return SetOutputColumns(cols, partsupp_types_, partsupp_name_map_, partsupp_cols_); + } + + Result<util::optional<ExecBatch>> NextPartBatch() + { + size_t thread_index = thread_indexer_(); + ThreadLocalData &tld = thread_local_data_[thread_index]; + { + std::lock_guard<std::mutex> lock(part_output_queue_mutex_); + bool all_generated = part_rows_generated_ == part_rows_to_generate_; Review comment: You can't move that if above because we may have generated all rows, but the queue may not be empty, so we need to check the queue first, and only then return if we've generated everything. I did get rid of the variable though and just pulled it into the `else if`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org