This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new c8500c89 [ISSUE #220] Add unit test (#353)
c8500c89 is described below
commit c8500c891afdd6cfa31710b049a856815ff3ec40
Author: Oliver <[email protected]>
AuthorDate: Tue Oct 18 09:56:26 2022 +0800
[ISSUE #220] Add unit test (#353)
---
.../runtime/serialization/FloatSerializer.java | 1 +
.../store/ConnectKeyValueDeserializer.java | 2 +-
.../connectorwrapper/WorkerDirectTaskTest.java | 144 +++++++++++++++++++++
.../serialization/FloatDeserializerTest.java} | 36 +++---
.../serialization/FloatSerializerTest.java} | 38 +++---
.../store/ConnectKeyValueDeserializerTest.java | 37 ++++++
.../store/ConnectKeyValueSerdeTest.java | 31 +++++
.../store/ConnectKeyValueSerializerTest.java | 36 ++++++
.../store/RecordPositionMapDeserializerTest.java | 51 ++++++++
.../store/RecordPositionMapSerdeTest.java | 40 ++++++
.../store/RecordPositionMapSerializerfTest.java | 51 ++++++++
.../MemoryStateManagementServiceImplTest.java | 74 +++++++++++
12 files changed, 507 insertions(+), 34 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
index f511ee08..da03eae7 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.runtime.serialization;
public class FloatSerializer implements Serializer<Float> {
+
@Override
public byte[] serialize(final String topic, final Float data) {
if (data == null) {
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
index 66a78a14..353fd877 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
+++
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
@@ -40,7 +40,7 @@ public class ConnectKeyValueDeserializer implements
Deserializer<ConnectKeyValue
return JSON.parseObject(jsonString, ConnectKeyValue.class);
} catch (UnsupportedEncodingException e) {
- log.error("ConnAndTaskConfigConverter#byteToObject failed", e);
+ log.error("ConnectKeyValueDeserializer#deserialize failed", e);
}
return null;
}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
new file mode 100644
index 00000000..00e3676c
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import com.google.common.collect.Lists;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.ErrorMetricsGroup;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.metrics.ConnectMetrics;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WorkerDirectTaskTest {
+
+ private WorkerDirectTask workerDirectTask;
+
+ private WorkerConfig workerConfig;
+
+ private ConnectorTaskId connectorTaskId;
+
+ private SourceTask sourceTask;
+
+ private ClassLoader classLoader;
+
+ private SinkTask sinkTask;
+
+ private ConnectKeyValue connectKeyValue;
+
+ private PositionManagementService positionManagementService;
+
+ private AtomicReference<WorkerState> workerState = new AtomicReference<>();
+
+ private ConnectStatsManager connectStatsManager;
+
+ private ConnectStatsService connectStatsService;
+
+ TransformChain<ConnectRecord> transformChain;
+
+ private KeyValue keyValue;
+
+ private Plugin plugin;
+
+ private RetryWithToleranceOperator retryWithToleranceOperator;
+
+ private ErrorMetricsGroup errorMetricsGroup;
+
+ private ConnectMetrics connectMetrics;
+
+ private WrapperStatusListener wrapperStatusListener;
+
+ private StateManagementService stateManagementService;
+
+ private ExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
+ @Before
+ public void before() {
+ workerConfig = new WorkerConfig();
+ connectorTaskId = new ConnectorTaskId("testConnector", 1);
+ sourceTask = new TestSourceTask();
+ classLoader = this.getClass().getClassLoader();
+ sinkTask = new TestSinkTask();
+ connectKeyValue = new ConnectKeyValue();
+ positionManagementService = new PositionManagementServiceImpl();
+ workerState.set(WorkerState.STARTED);
+ connectStatsManager = new ConnectStatsManager(workerConfig);
+ connectStatsService = new ConnectStatsService();
+ keyValue = new DefaultKeyValue();
+ plugin = new Plugin(Lists.newArrayList());
+ transformChain = new TransformChain<>(keyValue, plugin);
+ connectMetrics = new ConnectMetrics(workerConfig);
+ errorMetricsGroup = new ErrorMetricsGroup(connectorTaskId,
connectMetrics);
+ stateManagementService = new StateManagementServiceImpl();
+ wrapperStatusListener = new
WrapperStatusListener(stateManagementService, "defaultWorker1");
+ retryWithToleranceOperator = new RetryWithToleranceOperator(1000,
1000, ToleranceType.ALL, errorMetricsGroup);
+
+ workerDirectTask = new WorkerDirectTask(workerConfig,
+ connectorTaskId,
+ sourceTask,
+ classLoader,
+ sinkTask,
+ connectKeyValue,
+ positionManagementService,
+ workerState,
+ connectStatsManager,
+ connectStatsService,
+ transformChain,
+ retryWithToleranceOperator,
+ wrapperStatusListener,
+ connectMetrics);
+
+ workerDirectTask.doInitializeAndStart();
+ Runnable runnable = () -> workerDirectTask.execute();
+ executorService.submit(runnable);
+
+ }
+
+ @After
+ public void after() {
+ workerDirectTask.close();
+ }
+
+ @Test
+ public void initializeAndStartTest() {
+ Assertions.assertThatCode(() ->
workerDirectTask.initializeAndStart()).doesNotThrowAnyException();
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
similarity index 50%
copy from
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
copy to
rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
index f511ee08..3cf8f0f9 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,21 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.connect.runtime.serialization;
-public class FloatSerializer implements Serializer<Float> {
- @Override
- public byte[] serialize(final String topic, final Float data) {
- if (data == null) {
- return null;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FloatDeserializerTest {
+
+ @Test
+ public void deserializeTest() {
+ FloatDeserializer deserializer = new FloatDeserializer();
+ Float num = 1.2f;
+ int fbit = Float.floatToIntBits(num);
+ byte[] b = new byte[4];
+ for (int i = 0; i < 4; i++) {
+ b[i] = (byte) (fbit >> (24 - i * 8));
}
+ final Float result = deserializer.deserialize("testTopic", b);
+ Assert.assertEquals(num, result);
- long bits = Float.floatToRawIntBits(data);
- return new byte[] {
- (byte) (bits >>> 24),
- (byte) (bits >>> 16),
- (byte) (bits >>> 8),
- (byte) bits
- };
}
-}
\ No newline at end of file
+}
diff --git
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
similarity index 52%
copy from
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
copy to
rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
index f511ee08..fa087494 100644
---
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
@@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,21 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.connect.runtime.serialization;
-public class FloatSerializer implements Serializer<Float> {
- @Override
- public byte[] serialize(final String topic, final Float data) {
- if (data == null) {
- return null;
- }
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FloatSerializerTest {
- long bits = Float.floatToRawIntBits(data);
- return new byte[] {
- (byte) (bits >>> 24),
- (byte) (bits >>> 16),
- (byte) (bits >>> 8),
- (byte) bits
- };
+ @Test
+ public void serializeTest() {
+ Float num = 1.2f;
+ FloatSerializer serializer = new FloatSerializer();
+ final byte[] data = serializer.serialize("testTopic", num);
+
+ int value = 0;
+ for (byte b : data) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ Assert.assertTrue(num == Float.intBitsToFloat(value));
}
-}
\ No newline at end of file
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
new file mode 100644
index 00000000..0536c549
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueDeserializerTest {
+
+ @Test
+ public void deserializeTest() {
+ ConnectKeyValueDeserializer deserializer = new
ConnectKeyValueDeserializer();
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connector.topic", "testTopic");
+ connectKeyValue.put("max.task", 2);
+ final ConnectKeyValue result = deserializer.deserialize("testTopic",
JSON.toJSONBytes(connectKeyValue));
+ Assert.assertEquals(2, result.getInt("max.task"));
+ Assert.assertEquals("testTopic", result.getString("connector.topic"));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
new file mode 100644
index 00000000..1b3b4c94
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueSerdeTest {
+
+ @Test
+ public void serdeTest() {
+ final ConnectKeyValueSerde serde = ConnectKeyValueSerde.serde();
+ Assert.assertEquals(ConnectKeyValueSerializer.class.getName(),
serde.serializer().getClass().getName());
+ Assert.assertEquals(ConnectKeyValueDeserializer.class.getName(),
serde.deserializer().getClass().getName());
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
new file mode 100644
index 00000000..c0f0ffa1
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueSerializerTest {
+
+ @Test
+ public void serializeTest() {
+ ConnectKeyValueSerializer serializer = new ConnectKeyValueSerializer();
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connect.topic", "testTopic");
+ connectKeyValue.put("max.task", 2);
+ final byte[] result = serializer.serialize("testTopic",
connectKeyValue);
+ Assert.assertEquals(new String(JSON.toJSONBytes(connectKeyValue)), new
String(result));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
new file mode 100644
index 00000000..26ed13ce
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapDeserializerTest {
+
+ @Test
+ public void deserializeTest() {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put("defaultPartition", "defaultPartition");
+ ExtendRecordPartition partition = new
ExtendRecordPartition("testNamespace", partitionMap);
+
+ Map<String, Integer> offsetMap = new HashMap<>();
+ offsetMap.put("testOffset", 1);
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ Map<String, String> map = new HashMap<>();
+ map.put(JSON.toJSONString(partition), JSON.toJSONString(recordOffset));
+
+ RecordPositionMapDeserializer deserializer = new
RecordPositionMapDeserializer();
+ final Map<ExtendRecordPartition, RecordOffset> result =
deserializer.deserialize("testTopic", JSON.toJSONBytes(map));
+ for (Map.Entry<ExtendRecordPartition, RecordOffset> entry :
result.entrySet()) {
+ final ExtendRecordPartition key = entry.getKey();
+ final RecordOffset value = entry.getValue();
+ Assert.assertEquals(partition.toString(), key.toString());
+ Assert.assertEquals(recordOffset.toString(), value.toString());
+ }
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
new file mode 100644
index 00000000..a1c979ee
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
+import org.apache.rocketmq.connect.runtime.serialization.Serializer;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class RecordPositionMapSerdeTest {
+
+ @Test
+ public void serdeTest() {
+ final RecordPositionMapSerde serde = RecordPositionMapSerde.serde();
+ final Serializer<Map<ExtendRecordPartition, RecordOffset>> serializer
= serde.serializer();
+ final Deserializer<Map<ExtendRecordPartition, RecordOffset>>
deserializer = serde.deserializer();
+ Assertions.assertThat(serializer instanceof
RecordPositionMapSerializer).isTrue();
+ Assertions.assertThat(deserializer instanceof
RecordPositionMapDeserializer).isTrue();
+
+ }
+
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
new file mode 100644
index 00000000..fad2b1ff
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapSerializerfTest {
+
+ @Test
+ public void serializeTest() {
+ Map<ExtendRecordPartition, RecordOffset> data = new HashMap<>();
+
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put("defaultPartition", "defaultPartition");
+
+ ExtendRecordPartition partition = new
ExtendRecordPartition("testNamespace", partitionMap);
+
+ Map<String, Integer> offsetMap = new HashMap<>();
+ offsetMap.put("offset", 1);
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ data.put(partition, recordOffset);
+
+ Map<String, String> stringMap = new HashMap<>();
+ stringMap.put(JSON.toJSONString(partition),
JSON.toJSONString(recordOffset));
+
+ RecordPositionMapSerializer serializer = new
RecordPositionMapSerializer();
+ final byte[] bytes = serializer.serialize("testTopic", data);
+ Assert.assertEquals(com.alibaba.fastjson.JSON.toJSONString(stringMap),
new String(bytes));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
new file mode 100644
index 00000000..249b0494
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordConverter;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus;
+import
org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
+import org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryStateManagementServiceImplTest {
+
+ private MemoryStateManagementServiceImpl service;
+
+ private WorkerConfig workerConfig;
+
+ private RecordConverter recordConverter;
+
+ @Before
+ public void before() {
+ service = new MemoryStateManagementServiceImpl();
+ workerConfig = new WorkerConfig();
+ recordConverter = new JsonConverter();
+ service.initialize(workerConfig, recordConverter);
+ }
+
+ @Test
+ public void TaskStatusTest() {
+ ConnectorStatus connectorStatus = new ConnectorStatus("testConnector",
AbstractStatus.State.RUNNING, "defaultWorkId", 1L);
+ service.put(connectorStatus);
+
+ ConnectorTaskId taskId = new ConnectorTaskId("testConnector", 2);
+ TaskStatus taskStatus = new TaskStatus(taskId,
AbstractStatus.State.RUNNING, "defaultWorkId", 1L);
+ service.put(taskStatus);
+
+ final TaskStatus resultTaskStatus = service.get(taskId);
+ Assert.assertEquals(taskStatus.toString(),
resultTaskStatus.toString());
+
+ final ConnectorStatus resultConnectorStatus =
service.get("testConnector");
+ Assert.assertEquals(connectorStatus.toString(),
resultConnectorStatus.toString());
+
+ final Collection<TaskStatus> taskStatusList =
service.getAll("testConnector");
+ for (TaskStatus status : taskStatusList) {
+ Assert.assertEquals(taskStatus.toString(), status.toString());
+ }
+
+ final Set<String> connectors = service.connectors();
+ for (String connector : connectors) {
+ Assert.assertEquals("testConnector", connector);
+ }
+ }
+}