rtpsw commented on code in PR #35874:
URL: https://github.com/apache/arrow/pull/35874#discussion_r1225577069
##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1360,6 +1361,102 @@ TRACED_TEST(AsofJoinTest, TestUnorderedOnKey, {
schema({field("time", int64()), field("key", int32()), field("r0_v0",
float64())}));
})
+struct BackpressureCounters {
+ std::atomic<int32_t> pause_count = 0;
+ std::atomic<int32_t> resume_count = 0;
+};
+
+struct BackpressureCountingNodeOptions : public ExecNodeOptions {
+ BackpressureCountingNodeOptions(BackpressureCounters* counters) :
counters(counters) {}
+
+ BackpressureCounters* counters;
+};
+
+struct BackpressureCountingNode : public MapNode {
+ static constexpr const char* kKindName = "BackpressureCountingNode";
+ static constexpr const char* kFactoryName = "backpressure_count";
+
+ static void Register() {
+ auto exec_reg = default_exec_factory_registry();
+ if (!exec_reg->GetFactory(kFactoryName).ok()) {
+ ASSERT_OK(exec_reg->AddFactory(kFactoryName,
BackpressureCountingNode::Make));
+ }
+ }
+
+ BackpressureCountingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema,
+ const BackpressureCountingNodeOptions& options)
+ : MapNode(plan, inputs, output_schema), counters(options.counters) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+ auto bp_options = static_cast<const
BackpressureCountingNodeOptions&>(options);
+ return plan->EmplaceNode<BackpressureCountingNode>(
+ plan, inputs, inputs[0]->output_schema(), bp_options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+ Result<ExecBatch> ProcessBatch(ExecBatch batch) override { return batch; }
+
+ void PauseProducing(ExecNode* output, int32_t counter) override {
+ ++counters->pause_count;
+ inputs()[0]->PauseProducing(this, counter);
+ }
+ void ResumeProducing(ExecNode* output, int32_t counter) override {
+ ++counters->resume_count;
+ inputs()[0]->ResumeProducing(this, counter);
+ }
+
+ BackpressureCounters* counters;
+};
+
+struct BackpressureDelayingNodeOptions : public ExecNodeOptions {
+ BackpressureDelayingNodeOptions(double delay_seconds, std::function<bool()>
gate)
+ : delay_seconds(delay_seconds), gate(gate) {}
+
+ double delay_seconds;
+ std::function<bool()> gate;
+};
+
+struct BackpressureDelayingNode : public MapNode {
+ static constexpr auto kKindName = "BackpressureDelayingNode";
+ static constexpr const char* kFactoryName = "backpressure_delay";
+
+ static void Register() {
+ auto exec_reg = default_exec_factory_registry();
+ if (!exec_reg->GetFactory(kFactoryName).ok()) {
+ ASSERT_OK(exec_reg->AddFactory(kFactoryName,
BackpressureDelayingNode::Make));
+ }
+ }
+
+ BackpressureDelayingNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ std::shared_ptr<Schema> output_schema,
+ const BackpressureDelayingNodeOptions& options)
+ : MapNode(plan, inputs, output_schema),
+ gate(options.gate),
+ delay_seconds(options.delay_seconds) {}
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, kKindName));
+ auto bp_options = static_cast<const
BackpressureDelayingNodeOptions&>(options);
+ return plan->EmplaceNode<BackpressureDelayingNode>(
+ plan, inputs, inputs[0]->output_schema(), bp_options);
+ }
+
+ const char* kind_name() const override { return kKindName; }
+ Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
+ while (!gate()) {
+ SleepFor(delay_seconds);
Review Comment:
I went with `SleepABit`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]