sunxiaojian commented on code in PR #158:
URL: https://github.com/apache/rocketmq-connect/pull/158#discussion_r904703189
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java:
##########
@@ -583,28 +581,34 @@ private void receiveMessages(List<MessageExt> messages) {
private ConnectRecord convertToSinkDataEntry(MessageExt message) {
Map<String, String> properties = message.getProperties();
- Schema schema;
- Long timestamp;
- ConnectRecord sinkDataEntry = null;
- if (null == recordConverter || recordConverter instanceof
RocketMQConverter) {
+ ConnectRecord sinkDataEntry;
+
+ // start convert
+ if (recordConverter instanceof RecordConverter) {
+ // timestamp
String connectTimestamp =
properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
- timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
- String connectSchema =
properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
- schema = StringUtils.isNotEmpty(connectSchema) ?
JSON.parseObject(connectSchema, Schema.class) : null;
- byte[] body = message.getBody();
- RecordPartition recordPartition =
ConnectUtil.convertToRecordPartition(message.getTopic(),
message.getBrokerName(), message.getQueueId());
+ Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
+ // partition and offset
+ RecordPartition recordPartition =
ConnectUtil.convertToRecordPartition(message.getTopic(),
message.getBrokerName(), message.getQueueId());
RecordOffset recordOffset =
ConnectUtil.convertToRecordOffset(message.getQueueOffset());
- String bodyStr = new String(body, StandardCharsets.UTF_8);
Review Comment:
> If the converter is null, is it possible to use the previous one as the
default processing logic, the default StringConverter?
已处理
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java:
##########
@@ -321,25 +321,27 @@ private void sendRecord() throws InterruptedException,
RemotingException, MQClie
throw new ConnectException("source connect lack of topic
config");
}
sourceMessage.setTopic(topic);
- putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
- if (null == recordConverter || recordConverter instanceof
RocketMQConverter) {
- Object payload = sourceDataEntry.getData();
- if (null != payload) {
- final byte[] messageBody =
(String.valueOf(payload)).getBytes();
- if (messageBody.length >
RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than
{} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE,
JSON.toJSONString(sourceDataEntry));
- continue;
- }
- sourceMessage.setBody(messageBody);
+ // converter
+ if (recordConverter instanceof RecordConverter) {
Review Comment:
> If recordConverter is null,Will it throw a null pointer?
已处理
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]