This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f46bb9  [ISSUE #220] Add unit test (connectorwrapper module)  (#225)
7f46bb9 is described below

commit 7f46bb97174e6defc4dc4af6cafadc343ad21464
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 15 20:15:07 2022 +0800

    [ISSUE #220] Add unit test (connectorwrapper module)  (#225)
    
    * Add ut RecordOffsetManagementTest
    
    * Add ut SourceTaskOffsetCommitterTest
    
    * Add ut TransformChainTest
    
    * Add WorkerSinkTaskTest and WorkSinkTaskContextTest
    
    * Add WorkerSourceTaskTest
---
 .../RecordOffsetManagementTest.java                |  91 ++++++++++++++++
 .../connectorwrapper/ServerResponseMocker.java     |   5 +
 .../SourceTaskOffsetCommitterTest.java             |  84 +++++++++++++++
 .../runtime/connectorwrapper/TestTransform.java    |  38 +++++++
 .../connectorwrapper/TransformChainTest.java       |  63 +++++++++++
 .../WorkerSinkTaskContextTest.java                 |  87 +++++++++++++++
 .../connectorwrapper/WorkerSinkTaskTest.java       | 110 +++++++++++++++++++
 .../connectorwrapper/WorkerSourceTaskTest.java     | 118 +++++++++++++++++++++
 .../runtime/connectorwrapper/WorkerTest.java       |  39 +++++--
 .../connectorwrapper/testimpl/TestConverter.java   |  14 +--
 .../connectorwrapper/testimpl/TestSinkTask.java    |  38 +++++++
 11 files changed, 674 insertions(+), 13 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
new file mode 100644
index 0000000..de61099
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.RecordOffsetManagement.SubmittedPosition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RecordOffsetManagementTest {
+
+    private RecordOffsetManagement recordOffsetManagement  = new 
RecordOffsetManagement();
+
+    private SubmittedPosition submittedPosition;
+
+    @Mock
+    private RecordPosition position;
+
+    private RecordOffsetManagement.CommittableOffsets committableOffsets;
+
+    @Before
+    public void before() {
+        submittedPosition = recordOffsetManagement.submitRecord(position);
+        Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+        RecordOffset recordOffset = new RecordOffset(new HashMap<>());
+        RecordPartition recordPartition = new RecordPartition(new HashMap<>());
+        offsets.put(recordPartition, recordOffset);
+        committableOffsets  = new 
RecordOffsetManagement.CommittableOffsets(offsets, 10, 10, 10, 50, 
recordPartition);
+    }
+
+    @Test
+    public void submitRecordTest() {
+        final SubmittedPosition result = 
recordOffsetManagement.submitRecord(position);
+        assert result != null;
+    }
+
+    @Test
+    public void awaitAllMessagesTest() {
+        final boolean flag = recordOffsetManagement.awaitAllMessages(1000, 
TimeUnit.MILLISECONDS);
+        assert flag == false;
+    }
+
+    @Test
+    public void committableOffsetsTest() {
+        recordOffsetManagement.submitRecord(position);
+
+        final RecordOffsetManagement.CommittableOffsets offsets = 
recordOffsetManagement.committableOffsets();
+        assert offsets != null;
+    }
+
+    @Test
+    public void ackTest() {
+        submittedPosition.ack();
+    }
+
+    @Test
+    public void removeTest() {
+        final boolean remove = submittedPosition.remove();
+        assert remove == true;
+    }
+
+    @Test
+    public void updatedWithTest() {
+        final RecordOffsetManagement.CommittableOffsets offsets = 
committableOffsets.updatedWith(committableOffsets);
+        assert offsets != null;
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index f9d4fc1..f67f13f 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -34,6 +34,11 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.DataVersion;
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
new file mode 100644
index 0000000..805a1d0
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SourceTaskOffsetCommitterTest {
+
+    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+    private ConnectConfig connectConfig = new ConnectConfig();
+
+    private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(5);
+
+    private ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers = 
new ConcurrentHashMap<>();
+
+    @Mock
+    private ConnectorTaskId connectorTaskId;
+
+    @Mock
+    private WorkerSourceTask workerSourceTask;
+
+    @Before
+    public void before() {
+        connectConfig.setOffsetCommitIntervalMs(100);
+        sourceTaskOffsetCommitter = new 
SourceTaskOffsetCommitter(connectConfig);
+        sourceTaskOffsetCommitter = new 
SourceTaskOffsetCommitter(connectConfig, scheduledExecutorService, committers);
+        ScheduledFuture<?> commitFuture = 
scheduledExecutorService.scheduleWithFixedDelay(() -> {
+            if (workerSourceTask.commitOffsets()) {
+                return;
+            }
+        }, 100, 1000, TimeUnit.MILLISECONDS);
+
+        committers.put(connectorTaskId, commitFuture);
+    }
+
+    @After
+    public void after() {
+        scheduledExecutorService.shutdown();
+        sourceTaskOffsetCommitter.close(100);
+    }
+
+    @Test
+    public void scheduleTest() throws InterruptedException {
+        sourceTaskOffsetCommitter.schedule(connectorTaskId, workerSourceTask);
+        TimeUnit.SECONDS.sleep(1);
+    }
+
+    @Test
+    public void removeTest() {
+        sourceTaskOffsetCommitter.remove(connectorTaskId);
+    }
+
+
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
new file mode 100644
index 0000000..77ce668
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+
+public class TestTransform implements Transform {
+    @Override public ConnectRecord doTransform(ConnectRecord record) {
+        final Object data = record.getData();
+        record.setData(data + "Hello World");
+        return record;
+    }
+
+    @Override public void start(KeyValue config) {
+
+    }
+
+    @Override public void stop() {
+
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
new file mode 100644
index 0000000..8c6f2a2
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransformChainTest {
+
+    private TransformChain transformChain;
+
+    private KeyValue keyValue = new DefaultKeyValue();
+
+    @Mock
+    private Plugin plugin;
+
+    @Mock
+    private ConnectRecord connectRecord;
+
+    @Before
+    public void before() {
+        keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+        keyValue.put("transforms-testTransform-class", 
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+        connectRecord.setExtensions(keyValue);
+        transformChain = new TransformChain(keyValue, plugin);
+    }
+
+    @After
+    public void after() throws Exception {
+        transformChain.close();
+    }
+
+    @Test
+    public void doTransformsTest() {
+        final ConnectRecord record = 
transformChain.doTransforms(connectRecord);
+        assert record != null;
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
new file mode 100644
index 0000000..f0dc3f2
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSinkTaskContextTest {
+
+    private WorkerSinkTaskContext workerSinkTaskContext;
+
+    private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+    @Mock
+    private WorkerSinkTask workerSinkTask;
+
+    private DefaultMQPullConsumer defaultMQPullConsumer = new 
DefaultMQPullConsumer();
+
+    private RecordPartition recordPartition;
+
+    private RecordOffset recordOffset;
+
+    @Before
+    public void before() {
+        Map<String, String> partition = new HashMap<>();
+        recordPartition = new RecordPartition(partition);
+        partition.put("queueId", "0");
+        partition.put("brokerName", "broker_a");
+        partition.put("topic", "TEST_TOPIC");
+        Map<String, String> offset = new HashMap<>();
+        offset.put("queueOffset", "0");
+        recordOffset = new RecordOffset(offset);
+        workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue, 
workerSinkTask, defaultMQPullConsumer);
+    }
+
+    @Test
+    public void resetOffsetTest() {
+        Assertions.assertThatCode(() -> 
workerSinkTaskContext.resetOffset(recordPartition, 
recordOffset)).doesNotThrowAnyException();
+
+        Map<RecordPartition, RecordOffset> offsets  = new HashMap<>();
+        offsets.put(recordPartition, recordOffset);
+        Assertions.assertThatCode(() ->  
workerSinkTaskContext.resetOffset(offsets)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void pauseTest() {
+        List<RecordPartition> recordPartitions = new ArrayList<>();
+        recordPartitions.add(recordPartition);
+
+        Assertions.assertThatCode(() -> 
workerSinkTaskContext.pause(recordPartitions)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void resumeTest() {
+        List<RecordPartition> recordPartitions = new ArrayList<>();
+        recordPartitions.add(recordPartition);
+        Assertions.assertThatCode(() -> 
workerSinkTaskContext.resume(recordPartitions)).doesNotThrowAnyException();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
new file mode 100644
index 0000000..df4d51e
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSinkTaskTest {
+
+    private WorkerSinkTask workerSinkTask;
+
+    private ConnectConfig connectConfig = new ConnectConfig();
+
+    private ConnectorTaskId connectorTaskId = new 
ConnectorTaskId("testConnector", 1);
+
+    private SinkTask sinkTask = new TestSinkTask();
+
+    private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+    private RecordConverter recordConverter = new TestConverter();
+
+    private DefaultMQPullConsumer defaultMQPullConsumer = new 
DefaultMQPullConsumer();
+
+    private AtomicReference<WorkerState> workerState = new 
AtomicReference<>(WorkerState.STARTED);
+
+    private ConnectStatsManager connectStatsManager = new 
ConnectStatsManager(connectConfig);
+
+    private ConnectStatsService connectStatsService = new 
ConnectStatsService();
+
+    private KeyValue keyValue = new DefaultKeyValue();
+
+    @Mock
+    private Plugin plugin;
+
+    private TransformChain<ConnectRecord> transformChain;
+
+    private RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(1000, 1000, ToleranceType.ALL);
+
+    private WorkerErrorRecordReporter workerErrorRecordReporter;
+
+    ExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    @Before
+    public void before() {
+        connectKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, 
"TEST_TOPIC");
+        keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+        keyValue.put("transforms-testTransform-class", 
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+        transformChain = new TransformChain<>(keyValue, plugin);
+        workerErrorRecordReporter = new 
WorkerErrorRecordReporter(retryWithToleranceOperator, recordConverter);
+        workerSinkTask = new WorkerSinkTask(connectConfig, connectorTaskId, 
sinkTask, WorkerSinkTaskTest.class.getClassLoader(), connectKeyValue,
+            recordConverter, recordConverter, defaultMQPullConsumer, 
workerState, connectStatsManager, connectStatsService,
+            transformChain, retryWithToleranceOperator, 
workerErrorRecordReporter);
+    }
+
+    @After
+    public void after() {
+        executorService.shutdown();
+    }
+
+    @Test
+    public void executeTest() throws InterruptedException {
+        Assertions.assertThatCode(() -> executorService.submit(() -> 
workerSinkTask.run())).doesNotThrowAnyException();
+        TimeUnit.SECONDS.sleep(5);
+        workerSinkTask.close();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
new file mode 100644
index 0000000..d24f754
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSourceTaskTest {
+
+    private WorkerSourceTask workerSourceTask;
+
+    private ConnectConfig connectConfig;
+
+    private ConnectorTaskId connectorTaskId = new 
ConnectorTaskId("testConnector" ,1);
+
+    private SourceTask sourceTask = new TestSourceTask();
+
+    private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+    private PositionManagementService positionManagementService = new 
TestPositionManageServiceImpl();
+
+    private RecordConverter recordConverter = new TestConverter();
+
+    @Mock
+    private DefaultMQProducer defaultMQProducer;
+
+    private AtomicReference<WorkerState> workerState = new 
AtomicReference<>(WorkerState.STARTED);
+
+    private ConnectStatsManager connectStatsManager;
+
+    private ConnectStatsService connectStatsService = new 
ConnectStatsService();
+
+    private TransformChain<ConnectRecord> transformChain;
+
+    private KeyValue keyValue = new DefaultKeyValue();
+
+    @Mock
+    private Plugin plugin;
+
+    private RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(1000, 1000, ToleranceType.ALL);
+
+    ExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    @Before
+    public void before() throws MQClientException, InterruptedException {
+        connectConfig = new ConnectConfig();
+        connectConfig.setNamesrvAddr("127.0.0.1:9876");
+        connectStatsManager = new ConnectStatsManager(connectConfig);
+        connectKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, 
"TEST_TOPIC");
+        keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+        keyValue.put("transforms-testTransform-class", 
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+        transformChain = new TransformChain<>(keyValue, plugin);
+        workerSourceTask = new WorkerSourceTask(connectConfig, 
connectorTaskId, sourceTask, this.getClass().getClassLoader(),
+            connectKeyValue, positionManagementService, recordConverter, 
recordConverter, defaultMQProducer, workerState,
+            connectStatsManager, connectStatsService, transformChain, 
retryWithToleranceOperator);
+        NameServerMocker.startByDefaultConf(9876, 10911);
+        ServerResponseMocker.startServer(10911, "Hello 
World".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @After
+    public void after() {
+        executorService.shutdown();
+    }
+
+    @Test
+    public void runTest() throws InterruptedException {
+        Assertions.assertThatCode(() -> executorService.submit(() -> 
workerSourceTask.run())).doesNotThrowAnyException();
+        TimeUnit.SECONDS.sleep(5);
+        workerSourceTask.close();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 63e966b..e3eb87f 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -28,8 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
 import 
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -38,6 +40,8 @@ import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
 import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import 
org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
+import 
org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
 import org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter;
 import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
@@ -56,6 +60,8 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class WorkerTest {
@@ -91,8 +97,19 @@ public class WorkerTest {
     @Mock
     private ConnectStatsService connectStatsService;
 
+    @Mock
+    private DelegatingClassLoader delegatingClassLoader;
+
+    @Mock
+    private PluginClassLoader pluginClassLoader;
+
     @Before
     public void init() {
+        
when(plugin.currentThreadLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
+        when(plugin.delegatingLoader()).thenReturn(delegatingClassLoader);
+        
when(plugin.currentThreadLoader()).thenReturn(this.getClass().getClassLoader());
+        when(plugin.newConnector(any())).thenReturn(new TestConnector());
+        
when(delegatingClassLoader.pluginClassLoader(any())).thenReturn(pluginClassLoader);
         connectConfig = new ConnectConfig();
         connectConfig.setHttpPort(8081);
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + 
File.separator + "testConnectorStore");
@@ -118,20 +135,21 @@ public class WorkerTest {
             // create retry operator
             RetryWithToleranceOperator retryWithToleranceOperator = 
ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
             
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-"
 + i, connectKeyValue));
-
-            runnables.add(new WorkerSourceTask(new ConnectConfig(),
-                    new ConnectorTaskId("TEST-CONN-" + i,i),
+            final WorkerSourceTask task = new WorkerSourceTask(new 
ConnectConfig(),
+                new ConnectorTaskId("TEST-CONN-" + i, i),
                 new TestSourceTask(),
                 null,
                 connectKeyValue,
                 new TestPositionManageServiceImpl(),
                 new JsonConverter(),
-                    new JsonConverter(),
+                new JsonConverter(),
                 producer,
                 new AtomicReference(WorkerState.STARTED),
                 connectStatsManager, connectStatsService,
                 transformChain,
-                retryWithToleranceOperator));
+                retryWithToleranceOperator);
+           runnables.add(task);
+
         }
         worker.setWorkingTasks(runnables);
         assertThat(worker.getWorkingTasks().size()).isEqualTo(3);
@@ -140,7 +158,8 @@ public class WorkerTest {
     }
 
     @After
-    public void destory() {
+    public void destroy() throws InterruptedException {
+        TimeUnit.SECONDS.sleep(2);
         worker.stop();
         TestUtils.deleteFile(new File(System.getProperty("user.home") + 
File.separator + "testConnectorStore"));
     }
@@ -172,6 +191,14 @@ public class WorkerTest {
             ConnectKeyValue connectKeyValue = new ConnectKeyValue();
             connectKeyValue.getProperties().put("key1", "TEST-CONN-" + i + 
"1");
             connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i + 
"2");
+            if (i == 1) {
+                // start direct task
+                
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_TYPE, 
Worker.TaskType.DIRECT.name());
+                
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_TASK_CLASS, 
TestSourceTask.class.getName());
+                
connectKeyValue.getProperties().put(RuntimeConfigDefine.SINK_TASK_CLASS, 
TestSinkTask.class.getName());
+            } else {
+                
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_TYPE, 
Worker.TaskType.SOURCE.name());
+            }
             
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_CLASS, 
TestSourceTask.class.getName());
             
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,
 TestConverter.class.getName());
             
connectKeyValue.getProperties().put(RuntimeConfigDefine.NAMESRV_ADDR, 
"127.0.0.1:9876");
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
index f1483f8..5aaa230 100644
--- 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
@@ -17,17 +17,17 @@
 
 package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
 
-import io.openmessaging.connector.api.data.Converter;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
 
-public class TestConverter implements Converter {
+public class TestConverter implements RecordConverter {
 
-    @Override
-    public byte[] objectToByte(Object object) {
-        return "test-converter".getBytes();
+    @Override public byte[] fromConnectData(String topic, Schema schema, 
Object value) {
+        return new byte[0];
     }
 
-    @Override
-    public Object byteToObject(byte[] bytes) {
+    @Override public SchemaAndValue toConnectData(String topic, byte[] value) {
         return null;
     }
 }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
new file mode 100644
index 0000000..1ef3344
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.List;
+
+public class TestSinkTask extends SinkTask {
+    @Override public void put(List<ConnectRecord> sinkRecords) throws 
ConnectException {
+
+    }
+
+    @Override public void start(KeyValue config) {
+
+    }
+
+    @Override public void stop() {
+
+    }
+}

Reply via email to