This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit 1e95020fec3548c56b9c08776d451a85d782f81f Author: 翊名 <[email protected]> AuthorDate: Mon Nov 25 16:53:18 2019 +0800 feat(PullConsumer) add seek to begin/end support --- .../apache/rocketmq/ons/api/PropertyKeyConst.java | 1 + .../ons/api/impl/rocketmq/PullConsumerImpl.java | 44 +++++++++------------- ons-core/pom.xml | 2 +- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java index b3d2670..d968264 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java @@ -79,6 +79,7 @@ public class PropertyKeyConst implements OMSBuiltinKeys { public static final String MsgTraceSwitch = "msgTraceSwitch"; + public static final String AUTO_COMMIT = "autoCommit"; } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java index 4d625a0..37678bd 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java @@ -105,34 +105,14 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer this.litePullConsumer.setPullThresholdSizeForQueue(maxCachedMessageSizeInMiB); } -// String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch); -// if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) { -// LOGGER.info("MQ Client Disable the Trace Hook!"); -// } else { -// try { -// Properties tempProperties = new Properties(); -// tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); -// tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); -// tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); -// tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); -// tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); -// tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); -// tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); -// tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name()); -// AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); -// dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl()); -// traceDispatcher = dispatcher; -// this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( -// new OnsConsumeMessageHookImpl(traceDispatcher)); -// } catch (Throwable e) { -// LOGGER.error("system mqtrace hook init failed ,maybe can't send msg trace data", e); -// } -// } - + String autoCommit = properties.getProperty(PropertyKeyConst.AUTO_COMMIT); + if (!UtilAll.isBlank(autoCommit)) { + this.litePullConsumer.setAutoCommit(Boolean.valueOf(autoCommit)); + } } @Override protected void updateNameServerAddr(String nameServerAddresses) { - //TODO + this.litePullConsumer.updateNameServerAddress(nameServerAddresses); } private Set<TopicPartition> convertToTopicPartitions(Collection<MessageQueue> messageQueues) { @@ -233,11 +213,21 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer } @Override public void seekToBeginning(TopicPartition topicPartition) { - //TODO + try { + this.litePullConsumer.seekToBegin(convertToMessageQueue(topicPartition)); + } catch (MQClientException ex) { + LOGGER.warn("Topic partition: {} seek to beginning error", topicPartition, ex); + throw new ONSClientException("Seek offset to beginning failed"); + } } @Override public void seekToEnd(TopicPartition topicPartition) { - //TODO + try { + this.litePullConsumer.seekToEnd(convertToMessageQueue(topicPartition)); + } catch (MQClientException ex) { + LOGGER.warn("Topic partition: {} seek to end error", topicPartition, ex); + throw new ONSClientException("Seek offset to end failed"); + } } diff --git a/ons-core/pom.xml b/ons-core/pom.xml index 62851ac..cd82803 100644 --- a/ons-core/pom.xml +++ b/ons-core/pom.xml @@ -44,7 +44,7 @@ <java_target_version>1.8</java_target_version> <file_encoding>UTF-8</file_encoding> <!-- Always use stable version of RocketMQ --> - <rocketmq.version>4.6.0</rocketmq.version> + <rocketmq.version>4.6.1-SNAPSHOT</rocketmq.version> <auth.version>${project.version}</auth.version> <spring.version>4.1.2.RELEASE</spring.version> <diamond.version>3.7.4</diamond.version>
