This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 4758256  fix compile error
4758256 is described below

commit 475825677fb0da3e06013587fad8f3e2a7c8c7cc
Author: yuntian.zb <[email protected]>
AuthorDate: Thu Jul 7 19:01:02 2022 +0800

    fix compile error
---
 .../org/apache/rocketmq/connect/runtime/ConnectController.java   | 0
 .../connect/runtime/connectorwrapper/WorkerDirectTask.java       | 6 ------
 .../connect/runtime/errors/WorkerErrorRecordReporter.java        | 9 ++++-----
 3 files changed, 4 insertions(+), 11 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectController.java
deleted file mode 100644
index e69de29..0000000
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index de3d25b..3387a80 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -179,9 +179,6 @@ public class WorkerDirectTask implements WorkerTask {
                 return taskConfig;
             }
 
-            @Override public KeyValue configs() {
-                return taskConfig;
-            }
 
             @Override
             public void resetOffset(RecordPartition recordPartition, 
RecordOffset recordOffset) {
@@ -230,9 +227,6 @@ public class WorkerDirectTask implements WorkerTask {
                 return taskConfig.getString(RuntimeConfigDefine.TASK_ID);
             }
 
-            @Override public KeyValue configs() {
-                return taskConfig;
-            }
             /**
              * Get the configurations of current task.
              *
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
index d37ee7c..18323a6 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporter.java
@@ -25,7 +25,6 @@ import io.openmessaging.connector.api.data.RecordConverter;
 import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.rocketmq.common.message.MessageExt;
 
-
 /**
  * worker error record reporter
  */
@@ -35,7 +34,7 @@ public class WorkerErrorRecordReporter implements 
ErrorRecordReporter {
     private RecordConverter converter;
 
     public WorkerErrorRecordReporter(RetryWithToleranceOperator 
retryWithToleranceOperator,
-                                     RecordConverter converter) {
+        RecordConverter converter) {
         this.retryWithToleranceOperator = retryWithToleranceOperator;
         this.converter = converter;
     }
@@ -56,18 +55,18 @@ public class WorkerErrorRecordReporter implements 
ErrorRecordReporter {
         String brokerName = partition.getPartition().containsKey("brokerName") 
? String.valueOf(partition.getPartition().get("topic")) : null;
 
         MessageExt consumerRecord = new MessageExt();
-        if (converter != null && converter instanceof RecordConverter){
+        if (converter != null && converter instanceof RecordConverter) {
             byte[] value = converter.fromConnectData(topic, 
record.getSchema(), record.getData());
             consumerRecord.setBody(value);
             consumerRecord.setBrokerName(brokerName);
             consumerRecord.setQueueId(queueId);
             consumerRecord.setQueueOffset(queueOffset);
-        }else {
+        } else {
             byte[] messageBody = JSON.toJSONString(record).getBytes();
             consumerRecord.setBody(messageBody);
         }
         // add extensions
-        record.getExtensions().keySet().forEach(key->{
+        record.getExtensions().keySet().forEach(key -> {
             consumerRecord.putUserProperty(key, 
record.getExtensions().getString(key));
         });
         retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.TASK_PUT, 
SinkTask.class, consumerRecord, error);

Reply via email to