This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new e617a18 [TUBEMQ-416] support consume from specified position (#319)
e617a18 is described below
commit e617a1827d65e1e6ccaeead8ce06de2310ef819b
Author: dockerzhang <[email protected]>
AuthorDate: Fri Nov 20 19:09:32 2020 +0800
[TUBEMQ-416] support consume from specified position (#319)
Co-authored-by: dockerzhang <[email protected]>
---
tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc | 7 +++++++
.../tubemq-client-python/src/python/tubemq/client.py | 4 +++-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
b/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
index 6cb38d8..32b95db 100644
--- a/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
+++ b/tubemq-client-twins/tubemq-client-python/src/cpp/tubemq_config.cc
@@ -29,6 +29,12 @@ using std::set;
using std::string;
PYBIND11_MODULE(tubemq_config, m) {
+ py::enum_<ConsumePosition>(m, "ConsumePosition")
+ .value("kConsumeFromFirstOffset",
ConsumePosition::kConsumeFromFirstOffset)
+ .value("kConsumeFromLatestOffset",
ConsumePosition::kConsumeFromLatestOffset)
+ .value("kComsumeFromMaxOffsetAlways",
ConsumePosition::kComsumeFromMaxOffsetAlways)
+ .export_values();
+
py::class_<ConsumerConfig>(m, "ConsumerConfig")
.def(py::init<>())
.def("setRpcReadTimeoutMs", &ConsumerConfig::SetRpcReadTimeoutMs)
@@ -38,5 +44,6 @@ PYBIND11_MODULE(tubemq_config, m) {
.def("setGroupConsumeTarget", static_cast<bool (ConsumerConfig::*)\
(string&, const string&, const map<string,
set<string>>&)>(&ConsumerConfig::SetGroupConsumeTarget), "")
.def("getRpcReadTimeoutMs", &ConsumerConfig::GetRpcReadTimeoutMs)
+ .def("setConsumePosition", &ConsumerConfig::SetConsumePosition)
.def("getMasterAddrInfo", &ConsumerConfig::GetMasterAddrInfo);
}
\ No newline at end of file
diff --git
a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
index f29f088..f0cf21e 100644
--- a/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
+++ b/tubemq-client-twins/tubemq-client-python/src/python/tubemq/client.py
@@ -31,12 +31,14 @@ class consumer(tubemq_client.TubeMQConsumer):
group_name,
topic_list,
RpcReadTimeoutMs=20000,
-
conf_file=os.path.join(os.path.dirname(__file__),'client.conf')):
+
consume_osition=tubemq_config.ConsumePosition.kConsumeFromLatestOffset,
+ conf_file=os.path.join(os.path.dirname(__file__),
'client.conf')):
super(consumer, self).__init__()
consumer_config = tubemq_config.ConsumerConfig()
consumer_config.setRpcReadTimeoutMs(RpcReadTimeoutMs)
+ consumer_config.setConsumePosition(consume_osition)
err_info = ''
result = consumer_config.setMasterAddrInfo(err_info, master_addr)