This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 7f46bb9 [ISSUE #220] Add unit test (connectorwrapper module) (#225)
7f46bb9 is described below
commit 7f46bb97174e6defc4dc4af6cafadc343ad21464
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 15 20:15:07 2022 +0800
[ISSUE #220] Add unit test (connectorwrapper module) (#225)
* Add ut RecordOffsetManagementTest
* Add ut SourceTaskOffsetCommitterTest
* Add ut TransformChainTest
* Add WorkerSinkTaskTest and WorkSinkTaskContextTest
* Add WorkerSourceTaskTest
---
.../RecordOffsetManagementTest.java | 91 ++++++++++++++++
.../connectorwrapper/ServerResponseMocker.java | 5 +
.../SourceTaskOffsetCommitterTest.java | 84 +++++++++++++++
.../runtime/connectorwrapper/TestTransform.java | 38 +++++++
.../connectorwrapper/TransformChainTest.java | 63 +++++++++++
.../WorkerSinkTaskContextTest.java | 87 +++++++++++++++
.../connectorwrapper/WorkerSinkTaskTest.java | 110 +++++++++++++++++++
.../connectorwrapper/WorkerSourceTaskTest.java | 118 +++++++++++++++++++++
.../runtime/connectorwrapper/WorkerTest.java | 39 +++++--
.../connectorwrapper/testimpl/TestConverter.java | 14 +--
.../connectorwrapper/testimpl/TestSinkTask.java | 38 +++++++
11 files changed, 674 insertions(+), 13 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
new file mode 100644
index 0000000..de61099
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/RecordOffsetManagementTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.RecordOffsetManagement.SubmittedPosition;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RecordOffsetManagementTest {
+
+ private RecordOffsetManagement recordOffsetManagement = new
RecordOffsetManagement();
+
+ private SubmittedPosition submittedPosition;
+
+ @Mock
+ private RecordPosition position;
+
+ private RecordOffsetManagement.CommittableOffsets committableOffsets;
+
+ @Before
+ public void before() {
+ submittedPosition = recordOffsetManagement.submitRecord(position);
+ Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+ RecordOffset recordOffset = new RecordOffset(new HashMap<>());
+ RecordPartition recordPartition = new RecordPartition(new HashMap<>());
+ offsets.put(recordPartition, recordOffset);
+ committableOffsets = new
RecordOffsetManagement.CommittableOffsets(offsets, 10, 10, 10, 50,
recordPartition);
+ }
+
+ @Test
+ public void submitRecordTest() {
+ final SubmittedPosition result =
recordOffsetManagement.submitRecord(position);
+ assert result != null;
+ }
+
+ @Test
+ public void awaitAllMessagesTest() {
+ final boolean flag = recordOffsetManagement.awaitAllMessages(1000,
TimeUnit.MILLISECONDS);
+ assert flag == false;
+ }
+
+ @Test
+ public void committableOffsetsTest() {
+ recordOffsetManagement.submitRecord(position);
+
+ final RecordOffsetManagement.CommittableOffsets offsets =
recordOffsetManagement.committableOffsets();
+ assert offsets != null;
+ }
+
+ @Test
+ public void ackTest() {
+ submittedPosition.ack();
+ }
+
+ @Test
+ public void removeTest() {
+ final boolean remove = submittedPosition.remove();
+ assert remove == true;
+ }
+
+ @Test
+ public void updatedWithTest() {
+ final RecordOffsetManagement.CommittableOffsets offsets =
committableOffsets.updatedWith(committableOffsets);
+ assert offsets != null;
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index f9d4fc1..f67f13f 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -34,6 +34,11 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.DataVersion;
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
new file mode 100644
index 0000000..805a1d0
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/SourceTaskOffsetCommitterTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SourceTaskOffsetCommitterTest {
+
+ private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+ private ConnectConfig connectConfig = new ConnectConfig();
+
+ private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(5);
+
+ private ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers =
new ConcurrentHashMap<>();
+
+ @Mock
+ private ConnectorTaskId connectorTaskId;
+
+ @Mock
+ private WorkerSourceTask workerSourceTask;
+
+ @Before
+ public void before() {
+ connectConfig.setOffsetCommitIntervalMs(100);
+ sourceTaskOffsetCommitter = new
SourceTaskOffsetCommitter(connectConfig);
+ sourceTaskOffsetCommitter = new
SourceTaskOffsetCommitter(connectConfig, scheduledExecutorService, committers);
+ ScheduledFuture<?> commitFuture =
scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ if (workerSourceTask.commitOffsets()) {
+ return;
+ }
+ }, 100, 1000, TimeUnit.MILLISECONDS);
+
+ committers.put(connectorTaskId, commitFuture);
+ }
+
+ @After
+ public void after() {
+ scheduledExecutorService.shutdown();
+ sourceTaskOffsetCommitter.close(100);
+ }
+
+ @Test
+ public void scheduleTest() throws InterruptedException {
+ sourceTaskOffsetCommitter.schedule(connectorTaskId, workerSourceTask);
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ @Test
+ public void removeTest() {
+ sourceTaskOffsetCommitter.remove(connectorTaskId);
+ }
+
+
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
new file mode 100644
index 0000000..77ce668
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TestTransform.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.data.ConnectRecord;
+
+public class TestTransform implements Transform {
+ @Override public ConnectRecord doTransform(ConnectRecord record) {
+ final Object data = record.getData();
+ record.setData(data + "Hello World");
+ return record;
+ }
+
+ @Override public void start(KeyValue config) {
+
+ }
+
+ @Override public void stop() {
+
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
new file mode 100644
index 0000000..8c6f2a2
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChainTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TransformChainTest {
+
+ private TransformChain transformChain;
+
+ private KeyValue keyValue = new DefaultKeyValue();
+
+ @Mock
+ private Plugin plugin;
+
+ @Mock
+ private ConnectRecord connectRecord;
+
+ @Before
+ public void before() {
+ keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+ keyValue.put("transforms-testTransform-class",
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+ connectRecord.setExtensions(keyValue);
+ transformChain = new TransformChain(keyValue, plugin);
+ }
+
+ @After
+ public void after() throws Exception {
+ transformChain.close();
+ }
+
+ @Test
+ public void doTransformsTest() {
+ final ConnectRecord record =
transformChain.doTransforms(connectRecord);
+ assert record != null;
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
new file mode 100644
index 0000000..f0dc3f2
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSinkTaskContextTest {
+
+ private WorkerSinkTaskContext workerSinkTaskContext;
+
+ private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+ @Mock
+ private WorkerSinkTask workerSinkTask;
+
+ private DefaultMQPullConsumer defaultMQPullConsumer = new
DefaultMQPullConsumer();
+
+ private RecordPartition recordPartition;
+
+ private RecordOffset recordOffset;
+
+ @Before
+ public void before() {
+ Map<String, String> partition = new HashMap<>();
+ recordPartition = new RecordPartition(partition);
+ partition.put("queueId", "0");
+ partition.put("brokerName", "broker_a");
+ partition.put("topic", "TEST_TOPIC");
+ Map<String, String> offset = new HashMap<>();
+ offset.put("queueOffset", "0");
+ recordOffset = new RecordOffset(offset);
+ workerSinkTaskContext = new WorkerSinkTaskContext(connectKeyValue,
workerSinkTask, defaultMQPullConsumer);
+ }
+
+ @Test
+ public void resetOffsetTest() {
+ Assertions.assertThatCode(() ->
workerSinkTaskContext.resetOffset(recordPartition,
recordOffset)).doesNotThrowAnyException();
+
+ Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+ offsets.put(recordPartition, recordOffset);
+ Assertions.assertThatCode(() ->
workerSinkTaskContext.resetOffset(offsets)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void pauseTest() {
+ List<RecordPartition> recordPartitions = new ArrayList<>();
+ recordPartitions.add(recordPartition);
+
+ Assertions.assertThatCode(() ->
workerSinkTaskContext.pause(recordPartitions)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void resumeTest() {
+ List<RecordPartition> recordPartitions = new ArrayList<>();
+ recordPartitions.add(recordPartition);
+ Assertions.assertThatCode(() ->
workerSinkTaskContext.resume(recordPartitions)).doesNotThrowAnyException();
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
new file mode 100644
index 0000000..df4d51e
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSinkTaskTest {
+
+ private WorkerSinkTask workerSinkTask;
+
+ private ConnectConfig connectConfig = new ConnectConfig();
+
+ private ConnectorTaskId connectorTaskId = new
ConnectorTaskId("testConnector", 1);
+
+ private SinkTask sinkTask = new TestSinkTask();
+
+ private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+ private RecordConverter recordConverter = new TestConverter();
+
+ private DefaultMQPullConsumer defaultMQPullConsumer = new
DefaultMQPullConsumer();
+
+ private AtomicReference<WorkerState> workerState = new
AtomicReference<>(WorkerState.STARTED);
+
+ private ConnectStatsManager connectStatsManager = new
ConnectStatsManager(connectConfig);
+
+ private ConnectStatsService connectStatsService = new
ConnectStatsService();
+
+ private KeyValue keyValue = new DefaultKeyValue();
+
+ @Mock
+ private Plugin plugin;
+
+ private TransformChain<ConnectRecord> transformChain;
+
+ private RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(1000, 1000, ToleranceType.ALL);
+
+ private WorkerErrorRecordReporter workerErrorRecordReporter;
+
+ ExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ @Before
+ public void before() {
+ connectKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME,
"TEST_TOPIC");
+ keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+ keyValue.put("transforms-testTransform-class",
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+ transformChain = new TransformChain<>(keyValue, plugin);
+ workerErrorRecordReporter = new
WorkerErrorRecordReporter(retryWithToleranceOperator, recordConverter);
+ workerSinkTask = new WorkerSinkTask(connectConfig, connectorTaskId,
sinkTask, WorkerSinkTaskTest.class.getClassLoader(), connectKeyValue,
+ recordConverter, recordConverter, defaultMQPullConsumer,
workerState, connectStatsManager, connectStatsService,
+ transformChain, retryWithToleranceOperator,
workerErrorRecordReporter);
+ }
+
+ @After
+ public void after() {
+ executorService.shutdown();
+ }
+
+ @Test
+ public void executeTest() throws InterruptedException {
+ Assertions.assertThatCode(() -> executorService.submit(() ->
workerSinkTask.run())).doesNotThrowAnyException();
+ TimeUnit.SECONDS.sleep(5);
+ workerSinkTask.close();
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
new file mode 100644
index 0000000..d24f754
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTaskTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
+import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class WorkerSourceTaskTest {
+
+ private WorkerSourceTask workerSourceTask;
+
+ private ConnectConfig connectConfig;
+
+ private ConnectorTaskId connectorTaskId = new
ConnectorTaskId("testConnector" ,1);
+
+ private SourceTask sourceTask = new TestSourceTask();
+
+ private ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+
+ private PositionManagementService positionManagementService = new
TestPositionManageServiceImpl();
+
+ private RecordConverter recordConverter = new TestConverter();
+
+ @Mock
+ private DefaultMQProducer defaultMQProducer;
+
+ private AtomicReference<WorkerState> workerState = new
AtomicReference<>(WorkerState.STARTED);
+
+ private ConnectStatsManager connectStatsManager;
+
+ private ConnectStatsService connectStatsService = new
ConnectStatsService();
+
+ private TransformChain<ConnectRecord> transformChain;
+
+ private KeyValue keyValue = new DefaultKeyValue();
+
+ @Mock
+ private Plugin plugin;
+
+ private RetryWithToleranceOperator retryWithToleranceOperator = new
RetryWithToleranceOperator(1000, 1000, ToleranceType.ALL);
+
+ ExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ @Before
+ public void before() throws MQClientException, InterruptedException {
+ connectConfig = new ConnectConfig();
+ connectConfig.setNamesrvAddr("127.0.0.1:9876");
+ connectStatsManager = new ConnectStatsManager(connectConfig);
+ connectKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME,
"TEST_TOPIC");
+ keyValue.put(RuntimeConfigDefine.TRANSFORMS, "testTransform");
+ keyValue.put("transforms-testTransform-class",
"org.apache.rocketmq.connect.runtime.connectorwrapper.TestTransform");
+ transformChain = new TransformChain<>(keyValue, plugin);
+ workerSourceTask = new WorkerSourceTask(connectConfig,
connectorTaskId, sourceTask, this.getClass().getClassLoader(),
+ connectKeyValue, positionManagementService, recordConverter,
recordConverter, defaultMQProducer, workerState,
+ connectStatsManager, connectStatsService, transformChain,
retryWithToleranceOperator);
+ NameServerMocker.startByDefaultConf(9876, 10911);
+ ServerResponseMocker.startServer(10911, "Hello
World".getBytes(StandardCharsets.UTF_8));
+ }
+
+ @After
+ public void after() {
+ executorService.shutdown();
+ }
+
+ @Test
+ public void runTest() throws InterruptedException {
+ Assertions.assertThatCode(() -> executorService.submit(() ->
workerSourceTask.run())).doesNotThrowAnyException();
+ TimeUnit.SECONDS.sleep(5);
+ workerSourceTask.close();
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 63e966b..e3eb87f 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -28,8 +28,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
import
org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -38,6 +40,8 @@ import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnect
import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConverter;
import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import
org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
+import
org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
@@ -56,6 +60,8 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class WorkerTest {
@@ -91,8 +97,19 @@ public class WorkerTest {
@Mock
private ConnectStatsService connectStatsService;
+ @Mock
+ private DelegatingClassLoader delegatingClassLoader;
+
+ @Mock
+ private PluginClassLoader pluginClassLoader;
+
@Before
public void init() {
+
when(plugin.currentThreadLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
+ when(plugin.delegatingLoader()).thenReturn(delegatingClassLoader);
+
when(plugin.currentThreadLoader()).thenReturn(this.getClass().getClassLoader());
+ when(plugin.newConnector(any())).thenReturn(new TestConnector());
+
when(delegatingClassLoader.pluginClassLoader(any())).thenReturn(pluginClassLoader);
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
connectConfig.setStorePathRootDir(System.getProperty("user.home") +
File.separator + "testConnectorStore");
@@ -118,20 +135,21 @@ public class WorkerTest {
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator =
ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("TEST-CONN-"
+ i, connectKeyValue));
-
- runnables.add(new WorkerSourceTask(new ConnectConfig(),
- new ConnectorTaskId("TEST-CONN-" + i,i),
+ final WorkerSourceTask task = new WorkerSourceTask(new
ConnectConfig(),
+ new ConnectorTaskId("TEST-CONN-" + i, i),
new TestSourceTask(),
null,
connectKeyValue,
new TestPositionManageServiceImpl(),
new JsonConverter(),
- new JsonConverter(),
+ new JsonConverter(),
producer,
new AtomicReference(WorkerState.STARTED),
connectStatsManager, connectStatsService,
transformChain,
- retryWithToleranceOperator));
+ retryWithToleranceOperator);
+ runnables.add(task);
+
}
worker.setWorkingTasks(runnables);
assertThat(worker.getWorkingTasks().size()).isEqualTo(3);
@@ -140,7 +158,8 @@ public class WorkerTest {
}
@After
- public void destory() {
+ public void destroy() throws InterruptedException {
+ TimeUnit.SECONDS.sleep(2);
worker.stop();
TestUtils.deleteFile(new File(System.getProperty("user.home") +
File.separator + "testConnectorStore"));
}
@@ -172,6 +191,14 @@ public class WorkerTest {
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.getProperties().put("key1", "TEST-CONN-" + i +
"1");
connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i +
"2");
+ if (i == 1) {
+ // start direct task
+
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_TYPE,
Worker.TaskType.DIRECT.name());
+
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_TASK_CLASS,
TestSourceTask.class.getName());
+
connectKeyValue.getProperties().put(RuntimeConfigDefine.SINK_TASK_CLASS,
TestSinkTask.class.getName());
+ } else {
+
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_TYPE,
Worker.TaskType.SOURCE.name());
+ }
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_CLASS,
TestSourceTask.class.getName());
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,
TestConverter.class.getName());
connectKeyValue.getProperties().put(RuntimeConfigDefine.NAMESRV_ADDR,
"127.0.0.1:9876");
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
index f1483f8..5aaa230 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConverter.java
@@ -17,17 +17,17 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
-import io.openmessaging.connector.api.data.Converter;
+import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaAndValue;
-public class TestConverter implements Converter {
+public class TestConverter implements RecordConverter {
- @Override
- public byte[] objectToByte(Object object) {
- return "test-converter".getBytes();
+ @Override public byte[] fromConnectData(String topic, Schema schema,
Object value) {
+ return new byte[0];
}
- @Override
- public Object byteToObject(byte[] bytes) {
+ @Override public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
new file mode 100644
index 0000000..1ef3344
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestSinkTask.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.List;
+
+public class TestSinkTask extends SinkTask {
+ @Override public void put(List<ConnectRecord> sinkRecords) throws
ConnectException {
+
+ }
+
+ @Override public void start(KeyValue config) {
+
+ }
+
+ @Override public void stop() {
+
+ }
+}