This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch fifo_opt
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/fifo_opt by this push:
     new ebf2eb9e fix: prepare to debug
ebf2eb9e is described below

commit ebf2eb9e2bc07e0a1000930ef52526930a7ac5cc
Author: Li Zhanhui <[email protected]>
AuthorDate: Sun Apr 14 21:33:42 2024 +0800

    fix: prepare to debug
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 cpp/examples/CMakeLists.txt          |   1 +
 cpp/examples/ExampleFifoProducer.cpp | 193 +++++++++++++++++++++++++++++++++++
 2 files changed, 194 insertions(+)

diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8d6b0399..27304477 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -4,6 +4,7 @@ function(add_example name file)
 endfunction()
 
 add_example(example_producer ExampleProducer.cpp)
+add_example(example_fifo_producer ExampleFifoProducer.cpp)
 add_example(example_producer_with_async ExampleProducerWithAsync.cpp)
 add_example(example_producer_with_fifo_message 
ExampleProducerWithFifoMessage.cpp)
 add_example(example_producer_with_timed_message 
ExampleProducerWithTimedMessage.cpp)
diff --git a/cpp/examples/ExampleFifoProducer.cpp 
b/cpp/examples/ExampleFifoProducer.cpp
new file mode 100644
index 00000000..ff3a14db
--- /dev/null
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <memory>
+#include <random>
+#include <string>
+#include <system_error>
+
+#include "gflags/gflags.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/FifoProducer.h"
+#include "rocketmq/Logger.h"
+#include "rocketmq/Message.h"
+#include "rocketmq/Producer.h"
+#include "rocketmq/SendReceipt.h"
+
+using namespace ROCKETMQ_NAMESPACE;
+
+/**
+ * @brief A simple Semaphore to limit request concurrency.
+ */
+class Semaphore {
+public:
+  Semaphore(std::size_t permits) : permits_(permits) {
+  }
+
+  /**
+   * @brief Acquire a permit.
+   */
+  void acquire() {
+    while (true) {
+      std::unique_lock<std::mutex> lk(mtx_);
+      if (permits_ > 0) {
+        permits_--;
+        return;
+      }
+      cv_.wait(lk, [this]() { return permits_ > 0; });
+    }
+  }
+
+  /**
+   * @brief Release the permit back to semaphore.
+   */
+  void release() {
+    std::unique_lock<std::mutex> lk(mtx_);
+    permits_++;
+    if (1 == permits_) {
+      cv_.notify_one();
+    }
+  }
+
+private:
+  std::size_t permits_{0};
+  std::mutex mtx_;
+  std::condition_variable cv_;
+};
+
+const std::string& alphaNumeric() {
+  static std::string 
alpha_numeric("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+  return alpha_numeric;
+}
+
+std::string randomString(std::string::size_type len) {
+  std::string result;
+  result.reserve(len);
+  std::random_device rd;
+  std::mt19937 generator(rd());
+  std::string source(alphaNumeric());
+  std::string::size_type generated = 0;
+  while (generated < len) {
+    std::shuffle(source.begin(), source.end(), generator);
+    std::string::size_type delta = std::min({len - generated, 
source.length()});
+    result.append(source.substr(0, delta));
+    generated += delta;
+  }
+  return result;
+}
+
+DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are 
published");
+DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, 
provided by your service provider");
+DEFINE_int32(message_body_size, 4096, "Message body size");
+DEFINE_uint32(total, 256, "Number of sample messages to publish");
+DEFINE_string(access_key, "", "Your access key ID");
+DEFINE_string(access_secret, "", "Your access secret");
+DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
+DEFINE_uint32(concurrency, 16, "Concurrency of FIFO producer");
+
+int main(int argc, char* argv[]) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+
+  auto& logger = getLogger();
+  logger.setConsoleLevel(Level::Info);
+  logger.setLevel(Level::Info);
+  logger.init();
+
+  // Access Key/Secret pair may be acquired from management console
+  CredentialsProviderPtr credentials_provider;
+  if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
+    credentials_provider = 
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key, 
FLAGS_access_secret);
+  }
+
+  // In most case, you don't need to create too many producers, singleton 
pattern is recommended.
+  auto producer = FifoProducer::newBuilder()
+                      .withConfiguration(Configuration::newBuilder()
+                                             .withEndpoints(FLAGS_access_point)
+                                             
.withCredentialsProvider(credentials_provider)
+                                             .withSsl(FLAGS_tls)
+                                             .build())
+                      .withConcurrency(FLAGS_concurrency)
+                      .withTopics({FLAGS_topic})
+                      .build();
+
+  std::atomic_bool stopped;
+  std::atomic_long count(0);
+
+  auto stats_lambda = [&] {
+    while (!stopped.load(std::memory_order_relaxed)) {
+      long cnt = count.load(std::memory_order_relaxed);
+      while (!count.compare_exchange_weak(cnt, 0)) {
+        cnt = count.load(std::memory_order_relaxed);
+      }
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+      std::cout << "QPS: " << cnt << std::endl;
+    }
+  };
+
+  std::thread stats_thread(stats_lambda);
+
+  std::string body = randomString(FLAGS_message_body_size);
+
+  std::size_t completed = 0;
+  std::mutex mtx;
+  std::condition_variable cv;
+
+  std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency));
+
+  try {
+    for (std::size_t i = 0; i < FLAGS_total; ++i) {
+      auto message = Message::newBuilder()
+                         .withTopic(FLAGS_topic)
+                         .withTag("TagA")
+                         .withKeys({"Key-" + std::to_string(i)})
+                         .withGroup("message-group" + std::to_string(i % 
FLAGS_concurrency))
+                         .withBody(body)
+                         .build();
+      std::error_code ec;
+      auto callback = [&](const std::error_code& ec, const SendReceipt& 
receipt) mutable {
+        completed++;
+        count++;
+        semaphore->release();
+
+        if (completed >= FLAGS_total) {
+          cv.notify_all();
+        }
+      };
+
+      semaphore->acquire();
+      producer.send(std::move(message), callback);
+    }
+  } catch (...) {
+    std::cerr << "Ah...No!!!" << std::endl;
+  }
+
+  {
+    std::unique_lock<std::mutex> lk(mtx);
+    cv.wait(lk, [&]() { return completed >= FLAGS_total; });
+    std::cout << "Completed: " << completed << ", total: " << FLAGS_total << 
std::endl;
+  }
+
+  stopped.store(true, std::memory_order_relaxed);
+  if (stats_thread.joinable()) {
+    stats_thread.join();
+  }
+
+  return EXIT_SUCCESS;
+}

Reply via email to