dongeforever commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r853767970


##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -243,113 +247,133 @@ class AsyncRunnable implements Runnable {
         @Override
         public void run() {
             while (!stopped) {
-                List<TraceContext> contexts = new 
ArrayList<TraceContext>(batchSize);
                 synchronized (traceContextQueue) {
-                    for (int i = 0; i < batchSize; i++) {
-                        TraceContext context = null;
+                    long endTime = System.currentTimeMillis() + pollingTimeMil;
+                    while (System.currentTimeMillis() < endTime) {
                         try {
-                            //get trace data element from blocking Queue - 
traceContextQueue
-                            context = traceContextQueue.poll(5, 
TimeUnit.MILLISECONDS);
-                        } catch (InterruptedException e) {
-                        }
-                        if (context != null) {
-                            contexts.add(context);
-                        } else {
-                            break;
+                            TraceContext traceContext = traceContextQueue.poll(
+                                    endTime - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS
+                            );
+
+                            if (traceContext != null && 
!traceContext.getTraceBeans().isEmpty()) {
+                                // get the topic which the trace message will 
send to
+                                String traceTopicName = 
this.getTraceTopicName(traceContext.getRegionId());
+
+                                // get the traceDataSegment which will save 
this trace message, create if null
+                                TraceDataSegment traceDataSegment = 
taskQueueByTopic.get(traceTopicName);
+                                if (traceDataSegment == null) {
+                                    traceDataSegment = new 
TraceDataSegment(traceContext.getRegionId());
+                                    taskQueueByTopic.put(traceTopicName, 
traceDataSegment);
+                                }
+
+                                // encode traceContext and save it into 
traceDataSegment
+                                // NOTE if data size in traceDataSegment more 
than maxMsgSize,
+                                //  a AsyncDataSendTask will be created and 
submitted
+                                TraceTransferBean traceTransferBean = 
TraceDataEncoder.encoderFromContextBean(traceContext);
+                                
traceDataSegment.addTraceTransferBean(traceTransferBean);
+                            }
+                        } catch (InterruptedException ignore) {
+                            log.debug("traceContextQueue#poll exception");
                         }
                     }
-                    if (contexts.size() > 0) {
-                        AsyncAppenderRequest request = new 
AsyncAppenderRequest(contexts);
-                        traceExecutor.submit(request);
-                    } else if (AsyncTraceDispatcher.this.stopped) {
+
+                    // NOTE send the data in traceDataSegment which the first 
TraceTransferBean
+                    //  is longer than waitTimeThreshold
+                    sendDataByTimeThreshold();
+
+                    if (AsyncTraceDispatcher.this.stopped) {
                         this.stopped = true;
                     }
                 }
             }
 
         }
-    }
 
-    class AsyncAppenderRequest implements Runnable {
-        List<TraceContext> contextList;
+        private void sendDataByTimeThreshold() {
+            long now = System.currentTimeMillis();
+            for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+                if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+                    taskInfo.sendAllData();
+                }
+            }
+        }
 
-        public AsyncAppenderRequest(final List<TraceContext> contextList) {
-            if (contextList != null) {
-                this.contextList = contextList;
-            } else {
-                this.contextList = new ArrayList<TraceContext>(1);
+        private String getTraceTopicName(String regionId) {
+            AccessChannel accessChannel = 
AsyncTraceDispatcher.this.getAccessChannel();
+            if (AccessChannel.CLOUD == accessChannel) {
+                return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
             }
+
+            return AsyncTraceDispatcher.this.getTraceTopicName();
         }
+    }
 
-        @Override
-        public void run() {
-            sendTraceData(contextList);
+    class TraceDataSegment {
+        public long firstBeanAddTime;
+        public int currentMsgSize;
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList = new 
ArrayList();
+
+        TraceDataSegment(String regionId) {
+            this.regionId = regionId;
         }
 
-        public void sendTraceData(List<TraceContext> contextList) {
-            Map<String, List<TraceTransferBean>> transBeanMap = new 
HashMap<String, List<TraceTransferBean>>();
-            for (TraceContext context : contextList) {
-                if (context.getTraceBeans().isEmpty()) {
-                    continue;
-                }
-                // Topic value corresponding to original message entity content
-                String topic = context.getTraceBeans().get(0).getTopic();
-                String regionId = context.getRegionId();
-                // Use  original message entity's topic as key
-                String key = topic;
-                if (!StringUtils.isBlank(regionId)) {
-                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
-                }
-                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
-                if (transBeanList == null) {
-                    transBeanList = new ArrayList<TraceTransferBean>();
-                    transBeanMap.put(key, transBeanList);
-                }
-                TraceTransferBean traceData = 
TraceDataEncoder.encoderFromContextBean(context);
-                transBeanList.add(traceData);
-            }
-            for (Map.Entry<String, List<TraceTransferBean>> entry : 
transBeanMap.entrySet()) {
-                String[] key = 
entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
-                String dataTopic = entry.getKey();
-                String regionId = null;
-                if (key.length > 1) {
-                    dataTopic = key[0];
-                    regionId = key[1];
-                }
-                flushData(entry.getValue(), dataTopic, regionId);
+        public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+            initFirstBeanAddTime();
+            this.traceTransferBeanList.add(traceTransferBean);
+            this.currentMsgSize += traceTransferBean.getTransData().length();
+            if (currentMsgSize >= maxMsgSize) {
+                List<TraceTransferBean> dataToSend = new 
ArrayList(traceTransferBeanList);
+                AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(regionId, dataToSend);
+                traceExecutor.submit(asyncDataSendTask);
+
+                this.clear();
             }
         }
 
-        /**
-         * Batch sending data actually
-         */
-        private void flushData(List<TraceTransferBean> transBeanList, String 
dataTopic, String regionId) {
-            if (transBeanList.size() == 0) {
+        public void sendAllData() {
+            if (this.traceTransferBeanList.isEmpty()) {
                 return;
             }
-            // Temporary buffer
+            List<TraceTransferBean> dataToSend = new 
ArrayList(traceTransferBeanList);
+            AsyncDataSendTask asyncDataSendTask = new 
AsyncDataSendTask(regionId, dataToSend);
+            traceExecutor.submit(asyncDataSendTask);
+
+            this.clear();
+        }
+
+        private void initFirstBeanAddTime() {
+            if (firstBeanAddTime == 0) {
+                firstBeanAddTime = System.currentTimeMillis();
+            }
+        }
+
+        private void clear() {
+            this.firstBeanAddTime = 0;
+            this.currentMsgSize = 0;
+            this.traceTransferBeanList.clear();
+        }
+    }
+
+
+    class AsyncDataSendTask implements Runnable {
+        public final String regionId;
+        public final List<TraceTransferBean> traceTransferBeanList;
+
+        public AsyncDataSendTask(String regionId, List<TraceTransferBean> 
traceTransferBeanList) {
+            this.regionId = regionId;
+            this.traceTransferBeanList = traceTransferBeanList;
+        }
+
+        @Override
+        public void run() {
             StringBuilder buffer = new StringBuilder(1024);

Review Comment:
   It is a good habit to catch the unchecked exception in runnable, if  we do 
not get the future result, for the ThreadPool will swallow the exception, and 
we get no information if something unexpected happens.



##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -358,7 +382,7 @@ private void flushData(List<TraceTransferBean> 
transBeanList, String dataTopic,
          * @param keySet the keyset in this batch(including msgId in original 
message not offsetMsgId)
          * @param data   the message trace data in this batch
          */
-        private void sendTraceDataByMQ(Set<String> keySet, final String data, 
String dataTopic, String regionId) {
+        private void sendTraceDataByMQ(Set<String> keySet, final String data, 
String regionId) {
             String traceTopic = traceTopicName;

Review Comment:
   Since the topic is generated by getTraceTopicName during the previous 
process.
   Here is better to just use the topic generated before, and do not generate 
it again.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to