save-buffer commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r824961807



##########
File path: cpp/src/arrow/compute/exec/tpch_node.cc
##########
@@ -0,0 +1,3836 @@
+#include "arrow/compute/exec/tpch_node.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/future.h"
+#include "arrow/util/unreachable.h"
+
+#include <algorithm>
+#include <bitset>
+#include <cstring>
+#include <random>
+#include <vector>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <unordered_set>
+
+namespace arrow
+{
+    using internal::checked_cast;
+
+    namespace compute
+    {
+        class TpchText
+        {
+        public:
+            Status InitIfNeeded(random::pcg32_fast &rng);
+            Result<Datum> GenerateComments(
+                size_t num_comments,
+                size_t min_length,
+                size_t max_length,
+                random::pcg32_fast &rng);
+
+        private:
+            bool GenerateWord(int64_t &offset, random::pcg32_fast &rng, char 
*arr, const char **words, size_t num_choices);
+            bool GenerateNoun(int64_t &offset, random::pcg32_fast &rng, char 
*arr);
+            bool GenerateVerb(int64_t &offset, random::pcg32_fast &rng, char 
*arr);
+            bool GenerateAdjective(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+            bool GenerateAdverb(int64_t &offset, random::pcg32_fast &rng, char 
*arr);
+            bool GeneratePreposition(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+            bool GenerateAuxiliary(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+            bool GenerateTerminator(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+
+            bool GenerateNounPhrase(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+            bool GenerateVerbPhrase(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+            bool GeneratePrepositionalPhrase(int64_t &offset, 
random::pcg32_fast &rng, char *arr);
+
+            bool GenerateSentence(int64_t &offset, random::pcg32_fast &rng, 
char *arr);
+
+            std::atomic<bool> done_ = { false };
+            int64_t generated_offset_ = 0;
+            std::mutex text_guard_;
+            std::unique_ptr<Buffer> text_;
+            static constexpr int64_t kChunkSize = 8192;
+            static constexpr int64_t kTextBytes = 300 * 1024 * 1024; // 300 MB
+        };
+
+        class TpchTableGenerator
+        {
+        public:
+            using OutputBatchCallback = std::function<void(ExecBatch)>;
+            using FinishedCallback = std::function<void(int64_t)>;
+            using GenerateFn = std::function<Status(size_t)>;
+            using ScheduleCallback = std::function<Status(GenerateFn)>;
+            using AbortCallback = std::function<void()>;
+
+            virtual Status Init(
+                std::vector<std::string> columns,
+                float scale_factor,
+                int64_t batch_size) = 0;
+
+            virtual Status StartProducing(
+                size_t num_threads,
+                OutputBatchCallback output_callback,
+                FinishedCallback finished_callback,
+                ScheduleCallback schedule_callback) = 0;
+
+            void Abort(AbortCallback abort_callback)
+            {
+                bool expected = false;
+                if(done_.compare_exchange_strong(expected, true))
+                {
+                    abort_callback();
+                }
+            }
+
+            virtual std::shared_ptr<Schema> schema() const = 0;
+
+            virtual ~TpchTableGenerator() = default;
+
+        protected:
+            std::atomic<bool> done_ = { false };
+            std::atomic<int64_t> batches_outputted_ = { 0 };
+        };
+
+        int GetNumDigits(int64_t x)
+        {
+            // This if statement chain is for MAXIMUM SPEED
+            /*
+              .,
+              .      _,'f----.._
+              |\ ,-'"/  |     ,'
+              |,_  ,--.      /
+              /,-. ,'`.     (_
+              f  o|  o|__     "`-.
+              ,-._.,--'_ `.   _.,-`
+              `"' ___.,'` j,-'
+              `-.__.,--'
+             */
+            // Source: 
https://stackoverflow.com/questions/1068849/how-do-i-determine-the-number-of-digits-of-an-integer-in-c
+            ARROW_DCHECK(x >= 0);
+            if(x < 10ll) return 1;
+            if(x < 100ll) return 2;
+            if(x < 1000ll) return 3;
+            if(x < 10000ll) return 4;
+            if(x < 100000ll) return 5;
+            if(x < 1000000ll) return 6;
+            if(x < 10000000ll) return 7;
+            if(x < 100000000ll) return 8;
+            if(x < 1000000000ll) return 9;
+            if(x < 10000000000ll) return 10;
+            if(x < 100000000000ll) return 11;
+            if(x < 1000000000000ll) return 12;
+            if(x < 10000000000000ll) return 13;
+            if(x < 100000000000000ll) return 14;
+            if(x < 1000000000000000ll) return 15;
+            if(x < 10000000000000000ll) return 16;
+            if(x < 100000000000000000ll) return 17;
+            if(x < 1000000000000000000ll) return 18;
+            return -1;
+        }
+
+        void AppendNumberPaddedToNineDigits(char *out, int64_t x)
+        {
+            // We do all of this to avoid calling snprintf, which does a lot 
of crazy
+            // locale stuff. On Windows and MacOS this can get suuuuper slow
+            int num_digits = GetNumDigits(x);
+            int num_padding_zeros = std::max(9 - num_digits, 0);
+            std::memset(out, '0', static_cast<size_t>(num_padding_zeros));
+            while(x > 0)
+            {
+                *(out + num_padding_zeros + num_digits - 1) = ('0' + x % 10);
+                num_digits -= 1;
+                x /= 10;
+            }
+        }
+
+        Result<std::shared_ptr<Schema>> SetOutputColumns(
+            const std::vector<std::string> &columns,
+            const std::vector<std::shared_ptr<DataType>> &types,
+            const std::unordered_map<std::string, int> &name_map,
+            std::vector<int> &gen_list)
+        {
+            gen_list.clear();
+            std::vector<std::shared_ptr<Field>> fields;
+            if(columns.empty())
+            {
+                fields.resize(name_map.size());
+                gen_list.resize(name_map.size());
+                for(auto pair : name_map)
+                {
+                    int col_idx = pair.second;
+                    fields[col_idx] = field(pair.first, types[col_idx]);
+                    gen_list[col_idx] = col_idx;
+                }
+                return schema(std::move(fields));
+            }
+            else
+            {
+                for(const std::string &col : columns)
+                {
+                    auto entry = name_map.find(col);
+                    if(entry == name_map.end())
+                        return Status::Invalid("Not a valid column name");
+                    int col_idx = static_cast<int>(entry->second);
+                    fields.push_back(field(col, types[col_idx]));
+                    gen_list.push_back(col_idx);
+                }
+                return schema(std::move(fields));
+            }
+        }
+
+        static TpchText g_text;
+
+        Status TpchText::InitIfNeeded(random::pcg32_fast &rng)
+        {
+            if(done_.load())
+                return Status::OK();
+
+            {
+                std::lock_guard<std::mutex> lock(text_guard_);
+                if(!text_)
+                {
+                    ARROW_ASSIGN_OR_RAISE(text_, AllocateBuffer(kTextBytes));
+                }
+            }
+            char *out = reinterpret_cast<char *>(text_->mutable_data());
+            char temp_buff[kChunkSize];

Review comment:
       I don't know the length of each sentence up front, so I have to generate 
it first in order to know how much to increment the cursor by. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to