dongeforever commented on code in PR #4180:
URL: https://github.com/apache/rocketmq/pull/4180#discussion_r853767970
##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -243,113 +247,133 @@ class AsyncRunnable implements Runnable {
@Override
public void run() {
while (!stopped) {
- List<TraceContext> contexts = new
ArrayList<TraceContext>(batchSize);
synchronized (traceContextQueue) {
- for (int i = 0; i < batchSize; i++) {
- TraceContext context = null;
+ long endTime = System.currentTimeMillis() + pollingTimeMil;
+ while (System.currentTimeMillis() < endTime) {
try {
- //get trace data element from blocking Queue -
traceContextQueue
- context = traceContextQueue.poll(5,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- if (context != null) {
- contexts.add(context);
- } else {
- break;
+ TraceContext traceContext = traceContextQueue.poll(
+ endTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
+ );
+
+ if (traceContext != null &&
!traceContext.getTraceBeans().isEmpty()) {
+ // get the topic which the trace message will
send to
+ String traceTopicName =
this.getTraceTopicName(traceContext.getRegionId());
+
+ // get the traceDataSegment which will save
this trace message, create if null
+ TraceDataSegment traceDataSegment =
taskQueueByTopic.get(traceTopicName);
+ if (traceDataSegment == null) {
+ traceDataSegment = new
TraceDataSegment(traceContext.getRegionId());
+ taskQueueByTopic.put(traceTopicName,
traceDataSegment);
+ }
+
+ // encode traceContext and save it into
traceDataSegment
+ // NOTE if data size in traceDataSegment more
than maxMsgSize,
+ // a AsyncDataSendTask will be created and
submitted
+ TraceTransferBean traceTransferBean =
TraceDataEncoder.encoderFromContextBean(traceContext);
+
traceDataSegment.addTraceTransferBean(traceTransferBean);
+ }
+ } catch (InterruptedException ignore) {
+ log.debug("traceContextQueue#poll exception");
}
}
- if (contexts.size() > 0) {
- AsyncAppenderRequest request = new
AsyncAppenderRequest(contexts);
- traceExecutor.submit(request);
- } else if (AsyncTraceDispatcher.this.stopped) {
+
+ // NOTE send the data in traceDataSegment which the first
TraceTransferBean
+ // is longer than waitTimeThreshold
+ sendDataByTimeThreshold();
+
+ if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
- }
- class AsyncAppenderRequest implements Runnable {
- List<TraceContext> contextList;
+ private void sendDataByTimeThreshold() {
+ long now = System.currentTimeMillis();
+ for (TraceDataSegment taskInfo : taskQueueByTopic.values()) {
+ if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) {
+ taskInfo.sendAllData();
+ }
+ }
+ }
- public AsyncAppenderRequest(final List<TraceContext> contextList) {
- if (contextList != null) {
- this.contextList = contextList;
- } else {
- this.contextList = new ArrayList<TraceContext>(1);
+ private String getTraceTopicName(String regionId) {
+ AccessChannel accessChannel =
AsyncTraceDispatcher.this.getAccessChannel();
+ if (AccessChannel.CLOUD == accessChannel) {
+ return TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
+
+ return AsyncTraceDispatcher.this.getTraceTopicName();
}
+ }
- @Override
- public void run() {
- sendTraceData(contextList);
+ class TraceDataSegment {
+ public long firstBeanAddTime;
+ public int currentMsgSize;
+ public final String regionId;
+ public final List<TraceTransferBean> traceTransferBeanList = new
ArrayList();
+
+ TraceDataSegment(String regionId) {
+ this.regionId = regionId;
}
- public void sendTraceData(List<TraceContext> contextList) {
- Map<String, List<TraceTransferBean>> transBeanMap = new
HashMap<String, List<TraceTransferBean>>();
- for (TraceContext context : contextList) {
- if (context.getTraceBeans().isEmpty()) {
- continue;
- }
- // Topic value corresponding to original message entity content
- String topic = context.getTraceBeans().get(0).getTopic();
- String regionId = context.getRegionId();
- // Use original message entity's topic as key
- String key = topic;
- if (!StringUtils.isBlank(regionId)) {
- key = key + TraceConstants.CONTENT_SPLITOR + regionId;
- }
- List<TraceTransferBean> transBeanList = transBeanMap.get(key);
- if (transBeanList == null) {
- transBeanList = new ArrayList<TraceTransferBean>();
- transBeanMap.put(key, transBeanList);
- }
- TraceTransferBean traceData =
TraceDataEncoder.encoderFromContextBean(context);
- transBeanList.add(traceData);
- }
- for (Map.Entry<String, List<TraceTransferBean>> entry :
transBeanMap.entrySet()) {
- String[] key =
entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
- String dataTopic = entry.getKey();
- String regionId = null;
- if (key.length > 1) {
- dataTopic = key[0];
- regionId = key[1];
- }
- flushData(entry.getValue(), dataTopic, regionId);
+ public void addTraceTransferBean(TraceTransferBean traceTransferBean) {
+ initFirstBeanAddTime();
+ this.traceTransferBeanList.add(traceTransferBean);
+ this.currentMsgSize += traceTransferBean.getTransData().length();
+ if (currentMsgSize >= maxMsgSize) {
+ List<TraceTransferBean> dataToSend = new
ArrayList(traceTransferBeanList);
+ AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(regionId, dataToSend);
+ traceExecutor.submit(asyncDataSendTask);
+
+ this.clear();
}
}
- /**
- * Batch sending data actually
- */
- private void flushData(List<TraceTransferBean> transBeanList, String
dataTopic, String regionId) {
- if (transBeanList.size() == 0) {
+ public void sendAllData() {
+ if (this.traceTransferBeanList.isEmpty()) {
return;
}
- // Temporary buffer
+ List<TraceTransferBean> dataToSend = new
ArrayList(traceTransferBeanList);
+ AsyncDataSendTask asyncDataSendTask = new
AsyncDataSendTask(regionId, dataToSend);
+ traceExecutor.submit(asyncDataSendTask);
+
+ this.clear();
+ }
+
+ private void initFirstBeanAddTime() {
+ if (firstBeanAddTime == 0) {
+ firstBeanAddTime = System.currentTimeMillis();
+ }
+ }
+
+ private void clear() {
+ this.firstBeanAddTime = 0;
+ this.currentMsgSize = 0;
+ this.traceTransferBeanList.clear();
+ }
+ }
+
+
+ class AsyncDataSendTask implements Runnable {
+ public final String regionId;
+ public final List<TraceTransferBean> traceTransferBeanList;
+
+ public AsyncDataSendTask(String regionId, List<TraceTransferBean>
traceTransferBeanList) {
+ this.regionId = regionId;
+ this.traceTransferBeanList = traceTransferBeanList;
+ }
+
+ @Override
+ public void run() {
StringBuilder buffer = new StringBuilder(1024);
Review Comment:
It is a good habit to catch the unchecked exception in runnable, if we do
not get the future result, for the ThreadPool will swallow the exception, and
we get no information if something unexpected happens.
##########
client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java:
##########
@@ -358,7 +382,7 @@ private void flushData(List<TraceTransferBean>
transBeanList, String dataTopic,
* @param keySet the keyset in this batch(including msgId in original
message not offsetMsgId)
* @param data the message trace data in this batch
*/
- private void sendTraceDataByMQ(Set<String> keySet, final String data,
String dataTopic, String regionId) {
+ private void sendTraceDataByMQ(Set<String> keySet, final String data,
String regionId) {
String traceTopic = traceTopicName;
Review Comment:
Since the topic is generated by getTraceTopicName during the previous
process.
Here is better to just use the topic generated before, and do not generate
it again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]