This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new c8500c89 [ISSUE #220] Add unit test (#353)
c8500c89 is described below

commit c8500c891afdd6cfa31710b049a856815ff3ec40
Author: Oliver <[email protected]>
AuthorDate: Tue Oct 18 09:56:26 2022 +0800

    [ISSUE #220] Add unit test (#353)
---
 .../runtime/serialization/FloatSerializer.java     |   1 +
 .../store/ConnectKeyValueDeserializer.java         |   2 +-
 .../connectorwrapper/WorkerDirectTaskTest.java     | 144 +++++++++++++++++++++
 .../serialization/FloatDeserializerTest.java}      |  36 +++---
 .../serialization/FloatSerializerTest.java}        |  38 +++---
 .../store/ConnectKeyValueDeserializerTest.java     |  37 ++++++
 .../store/ConnectKeyValueSerdeTest.java            |  31 +++++
 .../store/ConnectKeyValueSerializerTest.java       |  36 ++++++
 .../store/RecordPositionMapDeserializerTest.java   |  51 ++++++++
 .../store/RecordPositionMapSerdeTest.java          |  40 ++++++
 .../store/RecordPositionMapSerializerfTest.java    |  51 ++++++++
 .../MemoryStateManagementServiceImplTest.java      |  74 +++++++++++
 12 files changed, 507 insertions(+), 34 deletions(-)

diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
index f511ee08..da03eae7 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.connect.runtime.serialization;
 
 public class FloatSerializer implements Serializer<Float> {
+
     @Override
     public byte[] serialize(final String topic, final Float data) {
         if (data == null) {
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
index 66a78a14..353fd877 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
+++ 
b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
@@ -40,7 +40,7 @@ public class ConnectKeyValueDeserializer implements 
Deserializer<ConnectKeyValue
 
             return JSON.parseObject(jsonString, ConnectKeyValue.class);
         } catch (UnsupportedEncodingException e) {
-            log.error("ConnAndTaskConfigConverter#byteToObject failed", e);
+            log.error("ConnectKeyValueDeserializer#deserialize failed", e);
         }
         return null;
     }
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
new file mode 100644
index 00000000..00e3676c
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTaskTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.connectorwrapper;
+
+import com.google.common.collect.Lists;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSinkTask;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestSourceTask;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.errors.ErrorMetricsGroup;
+import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
+import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
+import org.apache.rocketmq.connect.runtime.metrics.ConnectMetrics;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import 
org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
+import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class WorkerDirectTaskTest {
+
+    private WorkerDirectTask workerDirectTask;
+
+    private WorkerConfig workerConfig;
+
+    private ConnectorTaskId connectorTaskId;
+
+    private SourceTask sourceTask;
+
+    private ClassLoader classLoader;
+
+    private SinkTask sinkTask;
+
+    private ConnectKeyValue connectKeyValue;
+
+    private PositionManagementService positionManagementService;
+
+    private AtomicReference<WorkerState> workerState = new AtomicReference<>();
+
+    private ConnectStatsManager connectStatsManager;
+
+    private ConnectStatsService connectStatsService;
+
+    TransformChain<ConnectRecord> transformChain;
+
+    private KeyValue keyValue;
+
+    private Plugin plugin;
+
+    private RetryWithToleranceOperator retryWithToleranceOperator;
+
+    private ErrorMetricsGroup errorMetricsGroup;
+
+    private ConnectMetrics connectMetrics;
+
+    private WrapperStatusListener wrapperStatusListener;
+
+    private StateManagementService stateManagementService;
+
+    private ExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    @Before
+    public void before() {
+        workerConfig = new WorkerConfig();
+        connectorTaskId = new ConnectorTaskId("testConnector", 1);
+        sourceTask = new TestSourceTask();
+        classLoader = this.getClass().getClassLoader();
+        sinkTask = new TestSinkTask();
+        connectKeyValue = new ConnectKeyValue();
+        positionManagementService = new PositionManagementServiceImpl();
+        workerState.set(WorkerState.STARTED);
+        connectStatsManager = new ConnectStatsManager(workerConfig);
+        connectStatsService = new ConnectStatsService();
+        keyValue = new DefaultKeyValue();
+        plugin = new Plugin(Lists.newArrayList());
+        transformChain = new TransformChain<>(keyValue, plugin);
+        connectMetrics = new ConnectMetrics(workerConfig);
+        errorMetricsGroup = new ErrorMetricsGroup(connectorTaskId, 
connectMetrics);
+        stateManagementService = new StateManagementServiceImpl();
+        wrapperStatusListener = new 
WrapperStatusListener(stateManagementService, "defaultWorker1");
+        retryWithToleranceOperator = new RetryWithToleranceOperator(1000, 
1000, ToleranceType.ALL, errorMetricsGroup);
+
+        workerDirectTask = new WorkerDirectTask(workerConfig,
+            connectorTaskId,
+            sourceTask,
+            classLoader,
+            sinkTask,
+            connectKeyValue,
+            positionManagementService,
+            workerState,
+            connectStatsManager,
+            connectStatsService,
+            transformChain,
+            retryWithToleranceOperator,
+            wrapperStatusListener,
+            connectMetrics);
+
+        workerDirectTask.doInitializeAndStart();
+        Runnable runnable = () -> workerDirectTask.execute();
+        executorService.submit(runnable);
+
+    }
+
+    @After
+    public void after() {
+        workerDirectTask.close();
+    }
+
+    @Test
+    public void initializeAndStartTest() {
+        Assertions.assertThatCode(() -> 
workerDirectTask.initializeAndStart()).doesNotThrowAnyException();
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
similarity index 50%
copy from 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
copy to 
rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
index f511ee08..3cf8f0f9 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatDeserializerTest.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,21 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.connect.runtime.serialization;
 
-public class FloatSerializer implements Serializer<Float> {
-    @Override
-    public byte[] serialize(final String topic, final Float data) {
-        if (data == null) {
-            return null;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FloatDeserializerTest {
+
+    @Test
+    public void deserializeTest() {
+        FloatDeserializer deserializer = new FloatDeserializer();
+        Float num = 1.2f;
+        int fbit = Float.floatToIntBits(num);
+        byte[] b = new byte[4];
+        for (int i = 0; i < 4; i++) {
+            b[i] = (byte) (fbit >> (24 - i * 8));
         }
+        final Float result = deserializer.deserialize("testTopic", b);
+        Assert.assertEquals(num, result);
 
-        long bits = Float.floatToRawIntBits(data);
-        return new byte[] {
-            (byte) (bits >>> 24),
-            (byte) (bits >>> 16),
-            (byte) (bits >>> 8),
-            (byte) bits
-        };
     }
-}
\ No newline at end of file
+}
diff --git 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
similarity index 52%
copy from 
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
copy to 
rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
index f511ee08..fa087494 100644
--- 
a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializer.java
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/FloatSerializerTest.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,21 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.connect.runtime.serialization;
 
-public class FloatSerializer implements Serializer<Float> {
-    @Override
-    public byte[] serialize(final String topic, final Float data) {
-        if (data == null) {
-            return null;
-        }
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FloatSerializerTest {
 
-        long bits = Float.floatToRawIntBits(data);
-        return new byte[] {
-            (byte) (bits >>> 24),
-            (byte) (bits >>> 16),
-            (byte) (bits >>> 8),
-            (byte) bits
-        };
+    @Test
+    public void serializeTest() {
+        Float num = 1.2f;
+        FloatSerializer serializer = new FloatSerializer();
+        final byte[] data = serializer.serialize("testTopic", num);
+
+        int value = 0;
+        for (byte b : data) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        Assert.assertTrue(num == Float.intBitsToFloat(value));
     }
-}
\ No newline at end of file
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
new file mode 100644
index 00000000..0536c549
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializerTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueDeserializerTest {
+
+    @Test
+    public void deserializeTest() {
+        ConnectKeyValueDeserializer deserializer = new 
ConnectKeyValueDeserializer();
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connector.topic", "testTopic");
+        connectKeyValue.put("max.task", 2);
+        final ConnectKeyValue result = deserializer.deserialize("testTopic", 
JSON.toJSONBytes(connectKeyValue));
+        Assert.assertEquals(2, result.getInt("max.task"));
+        Assert.assertEquals("testTopic", result.getString("connector.topic"));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
new file mode 100644
index 00000000..1b3b4c94
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerdeTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueSerdeTest {
+
+    @Test
+    public void serdeTest() {
+        final ConnectKeyValueSerde serde = ConnectKeyValueSerde.serde();
+        Assert.assertEquals(ConnectKeyValueSerializer.class.getName(), 
serde.serializer().getClass().getName());
+        Assert.assertEquals(ConnectKeyValueDeserializer.class.getName(), 
serde.deserializer().getClass().getName());
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
new file mode 100644
index 00000000..c0f0ffa1
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializerTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConnectKeyValueSerializerTest {
+
+    @Test
+    public void serializeTest() {
+        ConnectKeyValueSerializer serializer = new ConnectKeyValueSerializer();
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connect.topic", "testTopic");
+        connectKeyValue.put("max.task", 2);
+        final byte[] result = serializer.serialize("testTopic", 
connectKeyValue);
+        Assert.assertEquals(new String(JSON.toJSONBytes(connectKeyValue)), new 
String(result));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
new file mode 100644
index 00000000..26ed13ce
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapDeserializerTest {
+
+    @Test
+    public void deserializeTest() {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("defaultPartition", "defaultPartition");
+        ExtendRecordPartition partition = new 
ExtendRecordPartition("testNamespace", partitionMap);
+
+        Map<String, Integer> offsetMap = new HashMap<>();
+        offsetMap.put("testOffset", 1);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        Map<String, String> map = new HashMap<>();
+        map.put(JSON.toJSONString(partition), JSON.toJSONString(recordOffset));
+
+        RecordPositionMapDeserializer deserializer = new 
RecordPositionMapDeserializer();
+        final Map<ExtendRecordPartition, RecordOffset> result = 
deserializer.deserialize("testTopic", JSON.toJSONBytes(map));
+        for (Map.Entry<ExtendRecordPartition, RecordOffset> entry : 
result.entrySet()) {
+            final ExtendRecordPartition key = entry.getKey();
+            final RecordOffset value = entry.getValue();
+            Assert.assertEquals(partition.toString(), key.toString());
+            Assert.assertEquals(recordOffset.toString(), value.toString());
+        }
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
new file mode 100644
index 00000000..a1c979ee
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerdeTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
+import org.apache.rocketmq.connect.runtime.serialization.Serializer;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class RecordPositionMapSerdeTest {
+
+    @Test
+    public void serdeTest() {
+        final RecordPositionMapSerde serde = RecordPositionMapSerde.serde();
+        final Serializer<Map<ExtendRecordPartition, RecordOffset>> serializer 
= serde.serializer();
+        final Deserializer<Map<ExtendRecordPartition, RecordOffset>> 
deserializer = serde.deserializer();
+        Assertions.assertThat(serializer instanceof 
RecordPositionMapSerializer).isTrue();
+        Assertions.assertThat(deserializer instanceof 
RecordPositionMapDeserializer).isTrue();
+
+    }
+
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
new file mode 100644
index 00000000..fad2b1ff
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializerfTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.serialization.store;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapSerializerfTest {
+
+    @Test
+    public void serializeTest() {
+        Map<ExtendRecordPartition, RecordOffset> data = new HashMap<>();
+
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("defaultPartition", "defaultPartition");
+
+        ExtendRecordPartition partition = new 
ExtendRecordPartition("testNamespace", partitionMap);
+
+        Map<String, Integer> offsetMap = new HashMap<>();
+        offsetMap.put("offset", 1);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        data.put(partition, recordOffset);
+
+        Map<String, String> stringMap = new HashMap<>();
+        stringMap.put(JSON.toJSONString(partition), 
JSON.toJSONString(recordOffset));
+
+        RecordPositionMapSerializer serializer = new 
RecordPositionMapSerializer();
+        final byte[] bytes = serializer.serialize("testTopic", data);
+        Assert.assertEquals(com.alibaba.fastjson.JSON.toJSONString(stringMap), 
new String(bytes));
+    }
+}
diff --git 
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
new file mode 100644
index 00000000..249b0494
--- /dev/null
+++ 
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImplTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordConverter;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus;
+import 
org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
+import org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter;
+import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryStateManagementServiceImplTest {
+
+    private MemoryStateManagementServiceImpl service;
+
+    private WorkerConfig workerConfig;
+
+    private RecordConverter recordConverter;
+
+    @Before
+    public void before() {
+        service = new MemoryStateManagementServiceImpl();
+        workerConfig = new WorkerConfig();
+        recordConverter = new JsonConverter();
+        service.initialize(workerConfig, recordConverter);
+    }
+
+    @Test
+    public void TaskStatusTest() {
+        ConnectorStatus connectorStatus = new ConnectorStatus("testConnector", 
AbstractStatus.State.RUNNING, "defaultWorkId", 1L);
+        service.put(connectorStatus);
+
+        ConnectorTaskId taskId = new ConnectorTaskId("testConnector", 2);
+        TaskStatus taskStatus = new TaskStatus(taskId, 
AbstractStatus.State.RUNNING, "defaultWorkId", 1L);
+        service.put(taskStatus);
+
+        final TaskStatus resultTaskStatus = service.get(taskId);
+        Assert.assertEquals(taskStatus.toString(), 
resultTaskStatus.toString());
+
+        final ConnectorStatus resultConnectorStatus = 
service.get("testConnector");
+        Assert.assertEquals(connectorStatus.toString(), 
resultConnectorStatus.toString());
+
+        final Collection<TaskStatus> taskStatusList = 
service.getAll("testConnector");
+        for (TaskStatus status : taskStatusList) {
+            Assert.assertEquals(taskStatus.toString(), status.toString());
+        }
+
+        final Set<String> connectors = service.connectors();
+        for (String connector : connectors) {
+            Assert.assertEquals("testConnector", connector);
+        }
+    }
+}

Reply via email to