baomingyu commented on a change in pull request #2091:
URL: https://github.com/apache/incubator-inlong/pull/2091#discussion_r777783682
##########
File path:
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
##########
@@ -408,49 +417,72 @@ private void formatMessagesAndSend(Map<String, String>
commonAttrMap,
StringBuilder sidBuilder = new StringBuilder();
sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
- .append(SEPARATOR).append(sequenceId);
+ .append(SEPARATOR).append(sequenceId);
headers.put(ConfigConstants.SEQUENCE_ID,
sidBuilder.toString());
}
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
- Event event = EventBuilder.withBody(data, headers);
- long dtten = 0;
- try {
- dtten =
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
- } catch (Exception e1) {
- long uniqVal =
Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
- throw new MessageIDException(uniqVal,
- ErrorCode.DT_ERROR,
- new Throwable("attribute dt=" +
headers.get(AttributeConstants.DATA_TIME
- + " has error, detail is: topic=" +
topicEntry.getKey() + "&streamId="
- + streamIdEntry.getKey() + "&NodeIP=" +
strRemoteIP), e1));
- }
+ // process proxy message list
+ this.processProxyMessageList(headers,
streamIdEntry.getValue());
Review comment:
the message of tdmsg format is deleted? the tdmsg format needs to be
retained. And new message format need add tow new file for messagehandler and
sink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]