save-buffer commented on a change in pull request #12537:
URL: https://github.com/apache/arrow/pull/12537#discussion_r823395915
##########
File path: cpp/src/arrow/compute/exec/tpch_node.cc
##########
@@ -0,0 +1,3758 @@
+#include "arrow/compute/exec/tpch_node.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/future.h"
+#include "arrow/util/unreachable.h"
+
+#include <algorithm>
+#include <bitset>
+#include <cstring>
+#include <random>
+#include <vector>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <unordered_set>
+
+namespace arrow
+{
+ using internal::checked_cast;
+
+ namespace compute
+ {
+ class TpchText
+ {
+ public:
+ Status InitIfNeeded(random::pcg32_fast &rng);
+ Result<Datum> GenerateComments(
+ size_t num_comments,
+ size_t min_length,
+ size_t max_length,
+ random::pcg32_fast &rng);
+
+ private:
+ bool GenerateWord(int64_t &offset, random::pcg32_fast &rng, char
*arr, const char **words, size_t num_choices);
+ bool GenerateNoun(int64_t &offset, random::pcg32_fast &rng, char
*arr);
+ bool GenerateVerb(int64_t &offset, random::pcg32_fast &rng, char
*arr);
+ bool GenerateAdjective(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+ bool GenerateAdverb(int64_t &offset, random::pcg32_fast &rng, char
*arr);
+ bool GeneratePreposition(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+ bool GenerateAuxiliary(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+ bool GenerateTerminator(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+
+ bool GenerateNounPhrase(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+ bool GenerateVerbPhrase(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+ bool GeneratePrepositionalPhrase(int64_t &offset,
random::pcg32_fast &rng, char *arr);
+
+ bool GenerateSentence(int64_t &offset, random::pcg32_fast &rng,
char *arr);
+
+ std::atomic<bool> done_ = { false };
+ int64_t generated_offset_ = 0;
+ std::mutex text_guard_;
+ std::unique_ptr<Buffer> text_;
+ random::pcg32_fast rng_;
+ static constexpr int64_t kChunkSize = 8192;
+ static constexpr int64_t kTextBytes = 300 * 1024 * 1024; // 300 MB
+ };
+
+ class TpchTableGenerator
+ {
+ public:
+ using OutputBatchCallback = std::function<void(ExecBatch)>;
+ using FinishedCallback = std::function<void(int64_t)>;
+ using GenerateFn = std::function<Status(size_t)>;
+ using ScheduleCallback = std::function<Status(GenerateFn)>;
+ using AbortCallback = std::function<void()>;
+
+ virtual Status Init(
+ std::vector<std::string> columns,
+ int scale_factor,
+ int64_t batch_size) = 0;
+
+ virtual Status StartProducing(
+ size_t num_threads,
+ OutputBatchCallback output_callback,
+ FinishedCallback finished_callback,
+ ScheduleCallback schedule_callback) = 0;
+
+ void Abort(AbortCallback abort_callback)
+ {
+ bool expected = false;
+ if(done_.compare_exchange_strong(expected, true))
+ {
+ abort_callback();
+ }
+ }
+
+ virtual std::shared_ptr<Schema> schema() const = 0;
+
+ virtual ~TpchTableGenerator() = default;
+
+ protected:
+ std::atomic<bool> done_ = { false };
+ std::atomic<int64_t> batches_generated_ = { 0 };
+ };
+
+ int GetNumDigits(int64_t x)
+ {
+ // This if statement chain is for MAXIMUM SPEED
Review comment:
Bisection was an interesting idea, I profiled it and it turned out to be
slower than the linear chain of if statements. I'm not quite sure why that is,
but I'd guess it has something to do with being harder for branch predictor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]