odbozhou commented on code in PR #158:
URL: https://github.com/apache/rocketmq-connect/pull/158#discussion_r904614982
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java:
##########
@@ -321,25 +321,27 @@ private void sendRecord() throws InterruptedException,
RemotingException, MQClie
throw new ConnectException("source connect lack of topic
config");
}
sourceMessage.setTopic(topic);
- putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
- if (null == recordConverter || recordConverter instanceof
RocketMQConverter) {
- Object payload = sourceDataEntry.getData();
- if (null != payload) {
- final byte[] messageBody =
(String.valueOf(payload)).getBytes();
- if (messageBody.length >
RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
- log.error("Send record, message size is greater than
{} bytes, sourceDataEntry: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE,
JSON.toJSONString(sourceDataEntry));
- continue;
- }
- sourceMessage.setBody(messageBody);
+ // converter
+ if (recordConverter instanceof RecordConverter) {
Review Comment:
If recordConverter is null,Will it throw a null pointer?
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java:
##########
@@ -583,28 +581,34 @@ private void receiveMessages(List<MessageExt> messages) {
private ConnectRecord convertToSinkDataEntry(MessageExt message) {
Map<String, String> properties = message.getProperties();
- Schema schema;
- Long timestamp;
- ConnectRecord sinkDataEntry = null;
- if (null == recordConverter || recordConverter instanceof
RocketMQConverter) {
+ ConnectRecord sinkDataEntry;
+
+ // start convert
+ if (recordConverter instanceof RecordConverter) {
Review Comment:
If recordConverter is null,Will it throw a null pointer?
##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java:
##########
@@ -583,28 +581,34 @@ private void receiveMessages(List<MessageExt> messages) {
private ConnectRecord convertToSinkDataEntry(MessageExt message) {
Map<String, String> properties = message.getProperties();
- Schema schema;
- Long timestamp;
- ConnectRecord sinkDataEntry = null;
- if (null == recordConverter || recordConverter instanceof
RocketMQConverter) {
+ ConnectRecord sinkDataEntry;
+
+ // start convert
+ if (recordConverter instanceof RecordConverter) {
+ // timestamp
String connectTimestamp =
properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
- timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
- String connectSchema =
properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
- schema = StringUtils.isNotEmpty(connectSchema) ?
JSON.parseObject(connectSchema, Schema.class) : null;
- byte[] body = message.getBody();
- RecordPartition recordPartition =
ConnectUtil.convertToRecordPartition(message.getTopic(),
message.getBrokerName(), message.getQueueId());
+ Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
+ // partition and offset
+ RecordPartition recordPartition =
ConnectUtil.convertToRecordPartition(message.getTopic(),
message.getBrokerName(), message.getQueueId());
RecordOffset recordOffset =
ConnectUtil.convertToRecordOffset(message.getQueueOffset());
- String bodyStr = new String(body, StandardCharsets.UTF_8);
Review Comment:
If the converter is null, is it possible to use the previous one as the
default processing logic, the default StringConverter?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]