This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 65df86f  [ISSUE #220] Add unit test(errors module)  (#258)
65df86f is described below

commit 65df86f400d90c79ddda8147bacdad621d0b3707
Author: Oliver <[email protected]>
AuthorDate: Tue Aug 23 15:14:38 2022 +0800

    [ISSUE #220] Add unit test(errors module)  (#258)
    
    * add ut
    
    * update ut
    
    * add ut
    
    * add ut
---
 .../connectorwrapper/ServerResponseMocker.java     | 43 ++++++++--
 .../errors/DeadLetterQueueReporterTest.java        | 93 ++++++++++++++++++++++
 .../runtime/errors/ProcessingContextTest.java      | 39 +++++++++
 .../runtime/errors/ReporterManagerUtilTest.java    | 61 ++++++++++++++
 .../errors/RetryWithToleranceOperatorTest.java     | 90 +++++++++++++++++++++
 .../errors/WorkerErrorRecordReporterTest.java      | 72 +++++++++++++++++
 6 files changed, 393 insertions(+), 5 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index f67f13f..6e9b3f2 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -31,8 +31,10 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.MixAll;
@@ -40,13 +42,10 @@ import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.netty.NettyDecoder;
 import org.apache.rocketmq.remoting.netty.NettyEncoder;
@@ -148,6 +147,12 @@ public abstract class ServerResponseMocker {
                     response.setBody(JSON.toJSONBytes(wrapper));
                     break;
                 }
+                case RequestCode.GET_ROUTEINFO_BY_TOPIC: {
+                    final TopicRouteData topicRouteData = 
buildTopicRouteData();
+                    response.setBody(JSON.toJSONBytes(topicRouteData));
+                    break;
+                }
+
                 default:
                     break;
             }
@@ -220,4 +225,32 @@ public abstract class ServerResponseMocker {
         subscriptionGroupWrapper.setDataVersion(dataVersion);
         return subscriptionGroupWrapper;
     }
+
+    private TopicRouteData buildTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("mockBrokerName");
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(8);
+        queueData.setWriteQueueNums(8);
+        List<QueueData> queueDataList = new ArrayList<QueueData>();
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "127.0.0.1:10911");
+
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerData.setBrokerName("mockBrokerName");
+        brokerData.setCluster("mockCluster");
+
+        List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        topicRouteData.setFilterServerTable(new HashMap<>());
+        return topicRouteData;
+    }
 }
+
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
new file mode 100644
index 0000000..e9c8e48
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeadLetterQueueReporterTest {
+
+    private ServerResponseMocker nameSrvMocker;
+
+    private ServerResponseMocker brokerMocker;
+
+    @Before
+    public void before() {
+        nameSrvMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+        brokerMocker = ServerResponseMocker.startServer(10911, "Hello 
Wrold".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @After
+    public void after() {
+        nameSrvMocker.shutdown();
+        brokerMocker.shutdown();
+    }
+
+    @Test
+    public void buildTest() {
+        final DeadLetterQueueReporter reporter = 
buildDeadLetterQueueReporter();
+        Assert.assertNotNull(reporter);
+    }
+
+    @Test
+    public void reportTest() {
+        ProcessingContext processingContext = new ProcessingContext();
+        MessageExt messageExt = new MessageExt();
+        messageExt.setBrokerName("mockBrokerName");
+        messageExt.setQueueId(0);
+        messageExt.setTopic("mockTopic");
+        messageExt.setBody("Hello World".getBytes(StandardCharsets.UTF_8));
+        processingContext.consumerRecord(messageExt);
+        final DeadLetterQueueReporter reporter = 
buildDeadLetterQueueReporter();
+
+        Assertions.assertThatCode(() -> 
reporter.report(processingContext)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void populateContextHeadersTest() {
+        Message producerRecord = new MessageExt();
+        producerRecord.setBody("Hello World".getBytes(StandardCharsets.UTF_8));
+        ProcessingContext processingContext = new ProcessingContext();
+        processingContext.stage(ErrorReporter.Stage.PRODUCE);
+        processingContext.executingClass(this.getClass());
+        final DeadLetterQueueReporter deadLetterQueueReporter = 
buildDeadLetterQueueReporter();
+        Assertions.assertThatCode(() -> 
deadLetterQueueReporter.populateContextHeaders(producerRecord, 
processingContext)).doesNotThrowAnyException();
+    }
+
+    private DeadLetterQueueReporter buildDeadLetterQueueReporter() {
+        ConnectKeyValue sinkConfig = new ConnectKeyValue();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(DeadLetterQueueConfig.DLQ_TOPIC_NAME_CONFIG, 
"DEAD_LETTER_TOPIC");
+        sinkConfig.setProperties(properties);
+        ConnectConfig workerConfig = new ConnectConfig();
+        final DeadLetterQueueReporter deadLetterQueueReporter = 
DeadLetterQueueReporter.build("fileSinkConnector", sinkConfig, workerConfig);
+        return deadLetterQueueReporter;
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContextTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContextTest.java
new file mode 100644
index 0000000..dda99ca
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContextTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Test;
+
+public class ProcessingContextTest {
+
+    private ProcessingContext processingContext = new ProcessingContext();
+
+    @After
+    public void after() {
+        processingContext.close();
+    }
+
+    @Test
+    public void reportTest() {
+        processingContext.stage(ErrorReporter.Stage.CONSUME);
+        processingContext.toString(true);
+        Assertions.assertThatCode(() -> 
processingContext.report()).doesNotThrowAnyException();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
new file mode 100644
index 0000000..db0c313
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/ReporterManagerUtilTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.RecordConverter;
+import java.util.List;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.converter.record.StringConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReporterManagerUtilTest {
+
+    private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+    @Test
+    public void createRetryWithToleranceOperatorTest() {
+        final RetryWithToleranceOperator operator = 
ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
+        Assert.assertNotNull(operator);
+
+    }
+
+    @Test
+    public void createWorkerErrorRecordReporterTest() {
+        RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(100, 100, ToleranceType.ALL);
+        RecordConverter converter = new StringConverter();
+        connectKeyValue.put("errors.log.enable", "true");
+        connectKeyValue.put("errors.deadletterqueue.topic.name", "TEST_TOPIC");
+        final WorkerErrorRecordReporter reporter = 
ReporterManagerUtil.createWorkerErrorRecordReporter(connectKeyValue, 
retryWithToleranceOperator, converter);
+        Assert.assertNotNull(reporter);
+    }
+
+    @Test
+    public void sinkTaskReportersTest() {
+        ConnectConfig workerConfig = new ConnectConfig();
+        final List<ErrorReporter> connector = 
ReporterManagerUtil.sinkTaskReporters("testConnector", connectKeyValue, 
workerConfig);
+        Assert.assertEquals(1, connector.size());
+    }
+
+    @Test
+    public void sourceTaskReportersTest() {
+        final List<ErrorReporter> connector = 
ReporterManagerUtil.sourceTaskReporters("testSourceConnector", connectKeyValue);
+        Assert.assertEquals(1, connector.size());
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperatorTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperatorTest.java
new file mode 100644
index 0000000..d115283
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RetryWithToleranceOperatorTest {
+
+    private RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(1000, 1000, ToleranceType.ALL);
+
+    @Mock
+    private RecordPartition recordPartition;
+
+    @Mock
+    private RecordOffset recordOffset;
+
+    private  Operation operation = new Operation() {
+        @Override public Object call() throws Exception {
+            return true;
+        }
+    };
+
+    @Test
+    public void executeFailedTest() {
+        ConnectRecord connectRecord = new ConnectRecord(recordPartition, 
recordOffset, System.currentTimeMillis());
+        Assertions.assertThatCode(() -> 
retryWithToleranceOperator.executeFailed(ErrorReporter.Stage.CONSUME, 
this.getClass(),
+            connectRecord, new RuntimeException())).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void executeTest() {
+        final Object result = retryWithToleranceOperator.execute(operation, 
ErrorReporter.Stage.CONSUME, this.getClass());
+        Assert.assertTrue((Boolean) result);
+    }
+
+    @Test
+    public void execAndRetryTest() throws Exception {
+        final Object result = 
retryWithToleranceOperator.execAndRetry(operation);
+        Assert.assertTrue((Boolean) result);
+    }
+
+    @Test
+    public void execAndHandleErrorTest() {
+        operation = () -> {
+            int i = 0;
+            if (i ==0 ) {
+                throw new RuntimeException();
+            }
+            return true;
+        };
+        final Object result = 
retryWithToleranceOperator.execAndHandleError(operation, 
RuntimeException.class);
+        Assert.assertNull(result);
+    }
+
+    @Test
+    public void withinToleranceLimitsTest() {
+        final boolean result = 
retryWithToleranceOperator.withinToleranceLimits();
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void backoffTest() {
+        Assertions.assertThatCode(() -> retryWithToleranceOperator.backoff(2, 
System.currentTimeMillis() + 1000)).doesNotThrowAnyException();
+    }
+
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporterTest.java
new file mode 100644
index 0000000..f21d30a
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/WorkerErrorRecordReporterTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.errors;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.converter.record.StringConverter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerErrorRecordReporterTest {
+
+    private WorkerErrorRecordReporter workerErrorRecordReporter;
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+
+    private RecordConverter recordConverter;
+
+    private ConnectRecord connectRecord;
+
+    private RecordPartition recordPartition;
+
+    @Mock
+    private RecordOffset recordOffset;
+
+    @Before
+    public void before() {
+        retryWithToleranceOperator = new RetryWithToleranceOperator(1000, 
2000, ToleranceType.ALL);
+        recordConverter = new StringConverter();
+        workerErrorRecordReporter = new 
WorkerErrorRecordReporter(retryWithToleranceOperator, recordConverter);
+        Map<String, Object> partition = new HashMap<>();
+        partition.put("queueId", 0);
+        partition.put("topic", "DEFAULT_TOPIC");
+        partition.put("queueOffset", 0L);
+        partition.put("brokerName", "mockBrokerName");
+        recordPartition = new RecordPartition(partition);
+        connectRecord = new ConnectRecord(recordPartition, recordOffset, 
System.currentTimeMillis());
+        KeyValue extensions = new DefaultKeyValue();
+        extensions.put("extension1", "test1");
+        connectRecord.setExtensions(extensions);
+    }
+
+    @Test
+    public void reportTest() {
+        workerErrorRecordReporter.report(connectRecord, new 
RuntimeException());
+    }
+}

Reply via email to