luchunliang commented on a change in pull request #2142:
URL: https://github.com/apache/incubator-inlong/pull/2142#discussion_r784449274



##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/stat/SortClientStateCounter.java
##########
@@ -236,12 +236,55 @@ public SortClientStateCounter 
addRequestManagerCommonErrorTimes(long num) {
     /**
      * count manager result param error times
      *
-     * @param num int
-     * @return SortClientStateCounter
+     * @param num long
+     * @return {@link SortClientStateCounter}
      */
     public SortClientStateCounter addRequestManagerParamErrorTimes(long num) {
         count.getAndAdd(15, num);
         return this;
     }
 
+    /**
+     * count thread fetch times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchTimes(long num) {
+        count.getAndAdd(16, num);

Review comment:
       magic number:16

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;

Review comment:
       Variable name is not fit with method name.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InLongTopicFetcher.java
##########
@@ -32,7 +29,7 @@ public InLongTopicFetcher(InLongTopic inLongTopic, 
ClientContext context) {
         this.context = context;
     }
 
-    public abstract boolean init(PulsarClient pulsarClient);
+    public abstract boolean init(Object client);

Review comment:
       client can use Java Generics.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);
+            this.fetchThread.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * isValidState
+     */
+    public void isValidState() {
+        if (closed) {
+            throw new IllegalStateException(inLongTopic + " closed.");
+        }
+    }
+
+    /**
+     * pause
+     */
+    @Override
+    public void pause() {
+        if (consumer != null) {
+            consumer.pause();
+        }
+    }
+
+    /**
+     * resume
+     */
+    @Override
+    public void resume() {
+        if (consumer != null) {
+            consumer.resume();
+        }
+    }
+
+    /**
+     * close
+     *
+     * @return true/false
+     */
+    @Override
+    public boolean close() {
+        mainLock.writeLock().lock();
+        try {
+            this.closed = true;
+            try {
+                if (consumer != null) {
+                    consumer.close();
+                }
+                if (fetchThread != null) {
+                    fetchThread.interrupt();
+                }
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+
+            logger.info("closed {}", inLongTopic);
+            return true;
+        } finally {
+            mainLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public class Fetcher implements Runnable {
+
+        /**
+         * put the received msg to onFinished method
+         *
+         * @param messageRecords {@link List<MessageRecord>}
+         */
+        private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
+            long start = System.currentTimeMillis();
+            try {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackTimes(1);
+                
context.getConfig().getCallback().onFinishedBatch(messageRecords);
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackTimeCost(System.currentTimeMillis() - 
start).addCallbackDoneTimes(1);
+            } catch (Exception e) {
+                context.getStatManager()
+                        .getStatistics(context.getConfig().getSortTaskId(),
+                                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                        .addCallbackErrorTimes(1);
+                e.printStackTrace();
+            }
+        }
+
+        private String getOffset(MessageId msgId) {
+            return Base64.getEncoder().encodeToString(msgId.toByteArray());
+        }
+
+        @Override
+        public void run() {
+            boolean hasPermit;
+            while (true) {
+                hasPermit = false;
+                try {
+                    if (context.getConfig().isStopConsume() || stopConsume) {
+                        TimeUnit.MILLISECONDS.sleep(50);
+                        continue;
+                    }
+
+                    if (sleepTime > 0) {
+                        TimeUnit.MILLISECONDS.sleep(sleepTime);
+                    }
+
+                    context.acquireRequestPermit();
+                    hasPermit = true;
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addMsgCount(1).addFetchTimes(1);
+
+                    long startFetchTime = System.currentTimeMillis();
+                    Messages<byte[]> messages = consumer.batchReceive();
+                    context.getStatManager()
+                            .getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                            .addFetchTimeCost(System.currentTimeMillis() - 
startFetchTime);

Review comment:
       In a loop turn, it is repeat invoking for "System.currentTimeMillis()".

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;

Review comment:
       It is better that boolean variable has "is" prefix.

##########
File path: 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
##########
@@ -47,19 +46,45 @@
      * @param sortId The sortId of fetched message.
      * @param message Message that fetched from upstream data storage.
      */
+
     private SubscribeFetchResult(
             final String sortId,
             final MessageRecord message) {
         this.sortId = sortId;
         this.headers.put(Constants.HEADER_KEY_MESSAGE_KEY, 
message.getMsgKey());
         this.headers.put(Constants.HEADER_KEY_MSG_OFFSET, message.getOffset());
         this.headers.put(Constants.HEADER_KEY_MSG_TIME, 
String.valueOf(message.getRecTime()));
-        this.headers.putAll(message.getMsgHeader());
-        this.body = message.getMessage();
+        //TODO to fix here

Review comment:
       ditto

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;

Review comment:
       Is it not finished? It is better to add a TODO comment.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/stat/SortClientStateCounter.java
##########
@@ -236,12 +236,55 @@ public SortClientStateCounter 
addRequestManagerCommonErrorTimes(long num) {
     /**
      * count manager result param error times
      *
-     * @param num int
-     * @return SortClientStateCounter
+     * @param num long
+     * @return {@link SortClientStateCounter}
      */
     public SortClientStateCounter addRequestManagerParamErrorTimes(long num) {
         count.getAndAdd(15, num);
         return this;
     }
 
+    /**
+     * count thread fetch times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchTimes(long num) {
+        count.getAndAdd(16, num);
+        return this;
+    }
+
+    /**
+     * count fetch error times
+     *
+     * @param num long
+     * @return {@link SortClientStateCounter}
+     */
+    public SortClientStateCounter addFetchErrorTimes(long num) {
+        count.getAndAdd(17, num);

Review comment:
       ditto

##########
File path: pom.xml
##########
@@ -95,10 +97,10 @@
         <module>inlong-agent</module>
         <module>inlong-manager</module>
         <module>inlong-sort</module>
+        <module>inlong-sdk</module>

Review comment:
       In fucture, inlong-sort module will use inlong-sdk module too, please 
upper priority of inlong-sdk.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;

Review comment:
       Is it not finished? It is better to add a TODO comment.

##########
File path: 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -100,10 +102,17 @@ public void setClient(@NotNull SortClient client) {
     public void onFinished(final MessageRecord messageRecord) {
         try {
             Preconditions.checkState(messageRecord != null, "Fetched msg is 
null.");
-            final SubscribeFetchResult result = 
SubscribeFetchResult.Factory.create(sortId, messageRecord);
-            final ProfileEvent profileEvent = new 
ProfileEvent(result.getBody(), result.getHeaders());
-            channelProcessor.processEvent(profileEvent);
-            context.reportToMetric(profileEvent, sortId, "-", 
SortSdkSourceContext.FetchResult.SUCCESS);
+            for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
+                //TODO fix here

Review comment:
       The description about TODO is not clear.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);
+            this.fetchThread.start();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * isValidState
+     */
+    public void isValidState() {
+        if (closed) {
+            throw new IllegalStateException(inLongTopic + " closed.");
+        }
+    }
+
+    /**
+     * pause
+     */
+    @Override
+    public void pause() {
+        if (consumer != null) {
+            consumer.pause();
+        }
+    }
+
+    /**
+     * resume
+     */
+    @Override
+    public void resume() {
+        if (consumer != null) {
+            consumer.resume();
+        }
+    }
+
+    /**
+     * close
+     *
+     * @return true/false
+     */
+    @Override
+    public boolean close() {
+        mainLock.writeLock().lock();
+        try {
+            this.closed = true;
+            try {
+                if (consumer != null) {
+                    consumer.close();
+                }
+                if (fetchThread != null) {
+                    fetchThread.interrupt();
+                }
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
+
+            logger.info("closed {}", inLongTopic);
+            return true;
+        } finally {
+            mainLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public class Fetcher implements Runnable {

Review comment:
       Fetcher runnable invoke InLongPulsarFetcherImpl method.
   It is not cohesion.
   Two classes can change to one.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;

Review comment:
       How to process this exception when sort invoke acking method?
   It is better that Sort-sdk process this exception.

##########
File path: 
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/pulsar/InLongPulsarFetcherImpl.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.impl.pulsar;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InLongTopicFetcher;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongPulsarFetcherImpl extends InLongTopicFetcher {
+
+    private final Logger logger = 
LoggerFactory.getLogger(InLongPulsarFetcherImpl.class);
+    private final ReentrantReadWriteLock mainLock = new 
ReentrantReadWriteLock(true);
+    private final ConcurrentHashMap<String, MessageId> offsetCache = new 
ConcurrentHashMap<>();
+    private volatile boolean closed = false;
+    private Consumer<byte[]> consumer;
+    private volatile boolean stopConsume = false;
+    private volatile Thread fetchThread;
+    private long sleepTime = 0L;
+    private int emptyPollTimes = 0;
+
+    public InLongPulsarFetcherImpl(InLongTopic inLongTopic,
+            ClientContext context) {
+        super(inLongTopic, context);
+    }
+
+    @Override
+    public void stopConsume(boolean stopConsume) {
+        this.stopConsume = stopConsume;
+    }
+
+    @Override
+    public boolean isConsumeStop() {
+        return stopConsume;
+    }
+
+    @Override
+    public InLongTopic getInLongTopic() {
+        return inLongTopic;
+    }
+
+    @Override
+    public long getConsumedDataSize() {
+        return 0;
+    }
+
+    @Override
+    public long getAckedOffset() {
+        return 0;
+    }
+
+    private void ackSucc(String offset) {
+        offsetCache.remove(offset);
+        
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckSuccTimes(1);
+    }
+
+    /**
+     * ack Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void ack(String msgOffset) throws Exception {
+        if (!StringUtils.isEmpty(msgOffset)) {
+            try {
+                if (consumer == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("consumer == null");
+                    return;
+                }
+                MessageId messageId = offsetCache.get(msgOffset);
+                if (messageId == null) {
+                    
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                            inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic())
+                            .addAckFailTimes(1);
+                    logger.error("messageId == null");
+                    return;
+                }
+                consumer.acknowledgeAsync(messageId)
+                        .thenAccept(consumer -> ackSucc(msgOffset))
+                        .exceptionally(exception -> {
+                            logger.error("ack fail:{}", msgOffset);
+                            
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                                    
inLongTopic.getInLongCluster().getClusterId(), inLongTopic.getTopic())
+                                    .addAckFailTimes(1);
+                            return null;
+                        });
+            } catch (Exception e) {
+                
context.getStatManager().getStatistics(context.getConfig().getSortTaskId(),
+                        inLongTopic.getInLongCluster().getClusterId(), 
inLongTopic.getTopic()).addAckFailTimes(1);
+                logger.error(e.getMessage(), e);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * create Consumer and fetch thread
+     *
+     * @return boolean
+     */
+    @Override
+    public boolean init(Object object) {
+        PulsarClient pulsarClient = (PulsarClient) object;
+        return createConsumer(pulsarClient);
+    }
+
+    private boolean createConsumer(PulsarClient client) {
+        try {
+            consumer = client.newConsumer(Schema.BYTES)
+                    .topic(inLongTopic.getTopic())
+                    .subscriptionName(context.getConfig().getSortTaskId())
+                    .subscriptionType(SubscriptionType.Shared)
+                    .startMessageIdInclusive()
+                    .ackTimeout(10, TimeUnit.SECONDS)
+                    
.receiverQueueSize(context.getConfig().getPulsarReceiveQueueSize())
+                    .subscribe();
+
+            String threadName = "sort_sdk_fetch_thread_" + 
StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS");
+            this.fetchThread = new Thread(new Fetcher(), threadName);

Review comment:
       It is better solution to use a thread pool.

##########
File path: 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -113,6 +122,16 @@ public void onFinished(final MessageRecord messageRecord) {
         }
     }
 
+    /**
+     * The callback function that SortSDK invoke when fetch messages batch
+     *
+     * @param messageRecord {@link List<MessageRecord>}
+     */
+    @Override
+    public void onFinishedBatch(List<MessageRecord> messageRecord) {
+        //TODO

Review comment:
       The description about TODO is not clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to