This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit 1e95020fec3548c56b9c08776d451a85d782f81f
Author: 翊名 <[email protected]>
AuthorDate: Mon Nov 25 16:53:18 2019 +0800

    feat(PullConsumer) add seek to begin/end support
---
 .../apache/rocketmq/ons/api/PropertyKeyConst.java  |  1 +
 .../ons/api/impl/rocketmq/PullConsumerImpl.java    | 44 +++++++++-------------
 ons-core/pom.xml                                   |  2 +-
 3 files changed, 19 insertions(+), 28 deletions(-)

diff --git 
a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
 
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
index b3d2670..d968264 100644
--- 
a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
+++ 
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/PropertyKeyConst.java
@@ -79,6 +79,7 @@ public class PropertyKeyConst implements OMSBuiltinKeys {
 
     public static final String MsgTraceSwitch = "msgTraceSwitch";
 
+    public static final String AUTO_COMMIT = "autoCommit";
 
 
 }
diff --git 
a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
 
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
index 4d625a0..37678bd 100644
--- 
a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
+++ 
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -105,34 +105,14 @@ public class PullConsumerImpl extends ONSClientAbstract 
implements PullConsumer
             
this.litePullConsumer.setPullThresholdSizeForQueue(maxCachedMessageSizeInMiB);
         }
 
-//        String msgTraceSwitch = 
properties.getProperty(PropertyKeyConst.MsgTraceSwitch);
-//        if (!UtilAll.isBlank(msgTraceSwitch) && 
(!Boolean.parseBoolean(msgTraceSwitch))) {
-//            LOGGER.info("MQ Client Disable the Trace Hook!");
-//        } else {
-//            try {
-//                Properties tempProperties = new Properties();
-//                tempProperties.put(OnsTraceConstants.AccessKey, 
sessionCredentials.getAccessKey());
-//                tempProperties.put(OnsTraceConstants.SecretKey, 
sessionCredentials.getSecretKey());
-//                tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
-//                tempProperties.put(OnsTraceConstants.AsyncBufferSize, 
"2048");
-//                tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
-//                tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, 
this.getNameServerAddr());
-//                tempProperties.put(OnsTraceConstants.InstanceName, 
"PID_CLIENT_INNER_TRACE_PRODUCER");
-//                tempProperties.put(OnsTraceConstants.TraceDispatcherType, 
OnsTraceDispatcherType.CONSUMER.name());
-//                AsyncArrayDispatcher dispatcher = new 
AsyncArrayDispatcher(tempProperties, sessionCredentials);
-//                
dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
-//                traceDispatcher = dispatcher;
-//                
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
-//                    new OnsConsumeMessageHookImpl(traceDispatcher));
-//            } catch (Throwable e) {
-//                LOGGER.error("system mqtrace hook init failed ,maybe can't 
send msg trace data", e);
-//            }
-//        }
-
+        String autoCommit = 
properties.getProperty(PropertyKeyConst.AUTO_COMMIT);
+        if (!UtilAll.isBlank(autoCommit)) {
+            this.litePullConsumer.setAutoCommit(Boolean.valueOf(autoCommit));
+        }
     }
 
     @Override protected void updateNameServerAddr(String nameServerAddresses) {
-        //TODO
+        this.litePullConsumer.updateNameServerAddress(nameServerAddresses);
     }
 
     private Set<TopicPartition> 
convertToTopicPartitions(Collection<MessageQueue> messageQueues) {
@@ -233,11 +213,21 @@ public class PullConsumerImpl extends ONSClientAbstract 
implements PullConsumer
     }
 
     @Override public void seekToBeginning(TopicPartition topicPartition) {
-        //TODO
+        try {
+            
this.litePullConsumer.seekToBegin(convertToMessageQueue(topicPartition));
+        } catch (MQClientException ex) {
+            LOGGER.warn("Topic partition: {} seek to beginning error", 
topicPartition, ex);
+            throw new ONSClientException("Seek offset to beginning failed");
+        }
     }
 
     @Override public void seekToEnd(TopicPartition topicPartition) {
-        //TODO
+        try {
+            
this.litePullConsumer.seekToEnd(convertToMessageQueue(topicPartition));
+        } catch (MQClientException ex) {
+            LOGGER.warn("Topic partition: {} seek to end error", 
topicPartition, ex);
+            throw new ONSClientException("Seek offset to end failed");
+        }
 
     }
 
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index 62851ac..cd82803 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -44,7 +44,7 @@
         <java_target_version>1.8</java_target_version>
         <file_encoding>UTF-8</file_encoding>
         <!-- Always use stable version of RocketMQ -->
-        <rocketmq.version>4.6.0</rocketmq.version>
+        <rocketmq.version>4.6.1-SNAPSHOT</rocketmq.version>
         <auth.version>${project.version}</auth.version>
         <spring.version>4.1.2.RELEASE</spring.version>
         <diamond.version>3.7.4</diamond.version>

Reply via email to