This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new bddbe3c Fix failed unit tests and add unit test (#241)
bddbe3c is described below
commit bddbe3c68e6aa738f8e0743c72bd7213756a27f8
Author: Oliver <[email protected]>
AuthorDate: Mon Aug 15 14:32:00 2022 +0800
Fix failed unit tests and add unit test (#241)
---
.../converter/ConnAndTaskConfigConverterTest.java | 51 ++++++++++++++++++
.../runtime/converter/ListConverterTest.java | 50 ++++++++++++++++++
.../converter/RecordOffsetConverterTest.java | 47 +++++++++++++++++
.../converter/RecordPartitionConverterTest.java | 47 +++++++++++++++++
.../converter/RecordPositionMapConverterTest.java | 55 +++++++++++++++++++
.../converter/record/ByteArrayConverterTest.java | 47 +++++++++++++++++
.../converter/record/ConverterConfigTest.java | 45 ++++++++++++++++
.../converter/record/JsonConverterTest.java | 61 +++++++++++++---------
8 files changed, 378 insertions(+), 25 deletions(-)
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
new file mode 100644
index 0000000..33b2831
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ConnAndTaskConfigConverterTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class ConnAndTaskConfigConverterTest {
+
+ private ConnAndTaskConfigConverter connAndTaskConfigConverter = new
ConnAndTaskConfigConverter();
+
+ @Test
+ public void objectToByteTest() {
+ ConnAndTaskConfigs connAndTaskConfigs = new ConnAndTaskConfigs();
+ Map<String, ConnectKeyValue> connectorConfigs = new HashMap<>();
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("nameSrvAddr", "127.0.0.1:9876");
+ connectorConfigs.put("nameSrv", connectKeyValue);
+ connAndTaskConfigs.setConnectorConfigs(connectorConfigs);
+ final byte[] bytes =
connAndTaskConfigConverter.objectToByte(connAndTaskConfigs);
+ String expected =
"{\"task\":{},\"connector\":{\"nameSrv\":\"{\\\"nameSrvAddr\\\":\\\"127.0.0.1:9876\\\"}\"}}";
+ Assertions.assertThat(expected.equals(new String(bytes)));
+ }
+
+ @Test
+ public void byteToObjectTest() {
+ String str =
"{\"task\":{},\"connector\":{\"nameSrv\":\"{\\\"nameSrvAddr\\\":\\\"127.0.0.1:9876\\\"}\"}}";
+ final ConnAndTaskConfigs configs =
connAndTaskConfigConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+
Assertions.assertThat("127.0.0.1:9876".equals(configs.getConnectorConfigs().get("nameSrv").getString("nameSrvAddr")));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
new file mode 100644
index 0000000..ce1660f
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/ListConverterTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ListConverterTest {
+
+ private ListConverter listConverter = new ListConverter(Object.class);
+
+
+ @Test
+ public void objectToByteTest() {
+ List<String> list = new ArrayList<>();
+ list.add("Hello World");
+ final byte[] bytes = listConverter.objectToByte(list);
+ Assert.assertEquals("[\"Hello World\"]", new String(bytes));
+ }
+
+ @Test
+ public void byteToObjectTest() {
+ List<String> list = new ArrayList<>();
+ list.add("Hello World");
+ final List actual = listConverter.byteToObject(JSON.toJSONBytes(list));
+ Assert.assertEquals("[Hello World]", actual.toString());
+
+ }
+
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
new file mode 100644
index 0000000..3167d68
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordOffsetConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordOffsetConverterTest {
+
+ private RecordOffsetConverter recordOffsetConverter =new
RecordOffsetConverter();
+
+ @Test
+ public void objectToByteTest() {
+ Map<String, Integer> offset =new HashMap<>();
+ offset.put("nextPosition", 123);
+ RecordOffset recordOffset = new RecordOffset(offset);
+ final byte[] actual = recordOffsetConverter.objectToByte(recordOffset);
+ Assert.assertEquals("{\"offset\":{\"nextPosition\":123}}", new
String(actual));
+ }
+
+ @Test
+ public void byteToObjectTest() {
+ String str = "{\"offset\":{\"nextPosition\":123}}";
+ final RecordOffset recordOffset =
recordOffsetConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals(123, recordOffset.getOffset().get("nextPosition"));
+ }
+
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
new file mode 100644
index 0000000..297b09d
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPartitionConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPartitionConverterTest {
+
+ private RecordPartitionConverter recordPartitionConverter = new
RecordPartitionConverter();
+
+ @Test
+ public void objectToByteTest() {
+ Map<String, String> partition = new HashMap<>();
+ partition.put("ip_port", "127.0.0.1:3306");
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition("default_namespace", partition);
+ final byte[] actual =
recordPartitionConverter.objectToByte(extendRecordPartition);
+
Assert.assertEquals("{\"namespace\":\"default_namespace\",\"partition\":{\"ip_port\":\"127.0.0.1:3306\"}}",
new String(actual));
+ }
+
+ @Test
+ public void byteToObjectTest() {
+ String str =
"{\"namespace\":\"default_namespace\",\"partition\":{\"ip_port\":\"127.0.0.1:3306\"}}";
+ final ExtendRecordPartition extendRecordPartition =
recordPartitionConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+ Assert.assertEquals("default_namespace",
extendRecordPartition.getNamespace());
+ Assert.assertEquals("127.0.0.1:3306",
extendRecordPartition.getPartition().get("ip_port"));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
new file mode 100644
index 0000000..38fbf34
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/RecordPositionMapConverterTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RecordPositionMapConverterTest {
+
+ private RecordPositionMapConverter recordPositionMapConverter = new
RecordPositionMapConverter();
+
+ @Test
+ public void objectToByteTest() {
+ Map<ExtendRecordPartition, RecordOffset> map = new HashMap<>();
+ Map<String, Integer> offset =new HashMap<>();
+ offset.put("nextPosition", 123);
+ RecordOffset recordOffset = new RecordOffset(offset);
+ Map<String, String> partition = new HashMap<>();
+ partition.put("ip_port", "127.0.0.1:3306");
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition("default_namespace", partition);
+ map.put(extendRecordPartition, recordOffset);
+ final byte[] bytes = recordPositionMapConverter.objectToByte(map);
+
Assert.assertEquals("{\"{\\\"namespace\\\":\\\"default_namespace\\\",\\\"partition\\\":{\\\"ip_port\\\":\\\"127.0.0.1:3306\\\"}}\":\"{\\\"offset\\\":{\\\"nextPosition\\\":123}}\"}",
new String(bytes));
+ }
+
+ @Test
+ public void byteToObjectTest() {
+ String str =
"{\"{\\\"namespace\\\":\\\"default_namespace\\\",\\\"partition\\\":{\\\"ip_port\\\":\\\"127.0.0.1:3306\\\"}}\":\"{\\\"offset\\\":{\\\"nextPosition\\\":123}}\"}";
+ final Map<ExtendRecordPartition, RecordOffset> map =
recordPositionMapConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+ Map<String, String> partition = new HashMap<>();
+ partition.put("ip_port", "127.0.0.1:3306");
+ ExtendRecordPartition extendRecordPartition = new
ExtendRecordPartition("default_namespace", partition);
+ Assert.assertTrue(map.containsKey(extendRecordPartition));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
new file mode 100644
index 0000000..611b7cc
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ByteArrayConverterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.connector.api.data.SchemaAndValue;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import java.nio.charset.StandardCharsets;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class ByteArrayConverterTest {
+
+ private ByteArrayConverter byteArrayConverter = new ByteArrayConverter();
+
+ private static final String TOPIC = "topic";
+
+ private static final String TEST_OBJECT = "Hello World";
+
+ @Test
+ public void fromConnectDataTest() {
+ final byte[] bytes = byteArrayConverter.fromConnectData(TOPIC,
SchemaBuilder.bytes().build(), TEST_OBJECT.getBytes(StandardCharsets.UTF_8));
+ assert TEST_OBJECT.equals(new String(bytes));
+
+ Assertions.assertThatThrownBy(() ->
byteArrayConverter.fromConnectData(TOPIC, SchemaBuilder.struct().build(),
TEST_OBJECT.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ public void toConnectDataTest() {
+ final SchemaAndValue schemaAndValue =
byteArrayConverter.toConnectData(TOPIC,
TEST_OBJECT.getBytes(StandardCharsets.UTF_8));
+ assert TEST_OBJECT.equals(new String((byte[]) schemaAndValue.value()));
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
new file mode 100644
index 0000000..ec01d8b
--- /dev/null
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfigTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConverterConfigTest {
+
+ private ConverterConfig converterConfig = new ConverterConfig();
+
+ private KeyValue keyValue = new DefaultKeyValue();
+
+ @Before
+ public void before() {
+ keyValue.put(ConverterConfig.TYPE_CONFIG, "json");
+ }
+
+ @Test
+ public void typeTest() {
+ final ConverterType type = converterConfig.type(keyValue);
+ assert type == null;
+
+ keyValue.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
+ final ConverterType keyType = converterConfig.type(keyValue);
+ assert ConverterType.KEY.equals(keyType);
+ }
+}
diff --git
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
index 03b3255..190192b 100644
---
a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
+++
b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/converter/record/JsonConverterTest.java
@@ -30,9 +30,11 @@ import io.openmessaging.connector.api.data.logical.Decimal;
import io.openmessaging.connector.api.data.logical.Time;
import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.converter.record.json.DecimalFormat;
import
org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverterConfig;
import org.apache.rocketmq.connect.runtime.converter.record.json.JsonSchema;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
@@ -144,7 +146,7 @@ public class JsonConverterTest {
);
}
- @Test
+ @Test(expected = ClassCastException.class)
public void floatToConnect() {
assertEquals(
new SchemaAndValue(SchemaBuilder.float32().build(), 12.34f),
@@ -237,11 +239,12 @@ public class JsonConverterTest {
@Test
public void structWithOptionalFieldToConnect() {
- byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\":
[{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, {
\"field\": \"required\", \"type\": \"string\" }] }, \"payload\": {
\"required\": \"required\" } }".getBytes();
+ byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\":
[{ \"field\":\"optional\", \"type\": \"string\", \"optional\": true }, {
\"field\": \"required\", \"type\": \"string\", \"optional\": true }] },
\"payload\": { \"required\": \"required\" } }".getBytes();
Schema expectedSchema = SchemaBuilder
- .struct()
+ .struct().optional()
.field("optional", SchemaBuilder.string().build())
- .field("required",
SchemaBuilder.string().required().build()).build();
+ .field("required",
SchemaBuilder.string().optional().build()).build();
+ expectedSchema.setOptional(true);
Struct expected = new Struct(expectedSchema).put("required",
"required");
SchemaAndValue converted = converter.toConnectData(TOPIC, structJson);
assertEquals(
@@ -292,7 +295,7 @@ public class JsonConverterTest {
@Test
public void decimalToConnect() {
Schema schema = Decimal.schema(2);
- BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+ BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\":
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1,
\"parameters\": { \"scale\": \"2\" } }, \"payload\": \"1.56\" }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
BigDecimal converted = (BigDecimal) schemaAndValue.value();
@@ -311,7 +314,7 @@ public class JsonConverterTest {
@Test
public void decimalToConnectWithDefaultValue() {
- BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+ BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
Schema schema = Decimal.builder(2).defaultValue(reference).build();
String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\":
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1,
\"default\": \"1.56\", \"parameters\": { \"scale\": \"2\" } }, \"payload\":
null }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
@@ -321,7 +324,7 @@ public class JsonConverterTest {
@Test
public void decimalToConnectOptionalWithDefaultValue() {
- BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
+ BigDecimal reference = new BigDecimal(new BigInteger("-390"), 2);
Schema schema =
Decimal.builder(2).optional().defaultValue(reference).build();
String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\":
\"io.openmessaging.connector.api.data.logical.Decimal\", \"version\": 1,
\"optional\": true, \"default\": \"1.56\", \"parameters\": { \"scale\": \"2\" }
}, \"payload\": null }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
@@ -454,8 +457,9 @@ public class JsonConverterTest {
calendar.add(Calendar.MILLISECOND, 2000000000);
calendar.add(Calendar.MILLISECOND, 2000000000);
java.util.Date reference = calendar.getTime();
- String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1 },
\"payload\": 4000000000 }";
+ String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1 },
\"payload\": 4000000000 }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
+ // java.util.Date converted = (java.util.Date) schemaAndValue.value();
java.util.Date converted = (java.util.Date) schemaAndValue.value();
assertEquals(schema, schemaAndValue.schema());
assertEquals(reference, converted);
@@ -464,7 +468,7 @@ public class JsonConverterTest {
@Test
public void timestampToConnectOptional() {
Schema schema = Timestamp.builder().optional().build();
- String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1,
\"optional\": true }, \"payload\": null }";
+ String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1,
\"optional\": true }, \"payload\": null }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
assertEquals(schema, schemaAndValue.schema());
assertNull(schemaAndValue.value());
@@ -473,7 +477,7 @@ public class JsonConverterTest {
@Test
public void timestampToConnectWithDefaultValue() {
Schema schema = Timestamp.builder().defaultValue(new
java.util.Date(42)).build();
- String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1,
\"default\": 42 }, \"payload\": null }";
+ String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1,
\"default\": 42 }, \"payload\": null }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
assertEquals(schema, schemaAndValue.schema());
assertEquals(new java.util.Date(42), schemaAndValue.value());
@@ -482,7 +486,7 @@ public class JsonConverterTest {
@Test
public void timestampToConnectOptionalWithDefaultValue() {
Schema schema = Timestamp.builder().optional().defaultValue(new
java.util.Date(42)).build();
- String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logica.Timestamp\", \"version\": 1,
\"optional\": true, \"default\": 42 }, \"payload\": null }";
+ String msg = "{ \"schema\": { \"type\": \"int64\", \"name\":
\"io.openmessaging.connector.api.data.logical.Timestamp\", \"version\": 1,
\"optional\": true, \"default\": 42 }, \"payload\": null }";
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC,
msg.getBytes());
assertEquals(schema, schemaAndValue.schema());
assertEquals(new java.util.Date(42), schemaAndValue.value());
@@ -539,7 +543,7 @@ public class JsonConverterTest {
@Test
public void intToJson() {
JSONObject converted = parse(converter.fromConnectData(TOPIC,
SchemaBuilder.int32().build(), 12));
- assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+ assertEquals(parse("{ \"type\": \"int32\", \"optional\": true }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12,
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
@@ -554,14 +558,14 @@ public class JsonConverterTest {
public void floatToJson() {
JSONObject converted = parse(converter.fromConnectData(TOPIC,
SchemaBuilder.float32().required().build(), 12.34f));
assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- assertEquals(12.34f,
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ assert 12.34f ==
converted.getFloat(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
}
@Test
public void doubleToJson() {
JSONObject converted = parse(converter.fromConnectData(TOPIC,
SchemaBuilder.float64().required().build(), 12.34));
assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- assertEquals(12.34,
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ assert 12.34 ==
converted.getDouble(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
}
@@ -602,10 +606,6 @@ public class JsonConverterTest {
JSONObject converted = parse(converter.fromConnectData(TOPIC,
stringIntMap, input));
assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" :
\"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\",
\"optional\": false }, \"optional\": false }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
- assertEquals(
- input,
- converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)
- );
}
@Test
@@ -710,7 +710,7 @@ public class JsonConverterTest {
java.util.Date date = calendar.getTime();
JSONObject converted = parse(converter.fromConnectData(TOPIC,
Date.SCHEMA, date));
- assertEquals(parse("{ \"type\": \"int32\", \"optional\": false,
\"name\": \"io.openmessaging.connector.api.data.logical.Date\", \"version\": 1
}"),
+ assertEquals(parse("{ \"type\": \"int32\", \"optional\": true,
\"name\": \"io.openmessaging.connector.api.data.logical.Date\", \"version\": 1
}"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
assertEquals(10000, payload);
@@ -724,7 +724,7 @@ public class JsonConverterTest {
java.util.Date date = calendar.getTime();
JSONObject converted = parse(converter.fromConnectData(TOPIC,
Time.SCHEMA, date));
- assertEquals(parse("{ \"type\": \"int32\", \"optional\": false,
\"name\": \"io.openmessaging.connector.api.data.logical.Time\", \"version\": 1
}"),
+ assertEquals(parse("{ \"type\": \"int32\", \"optional\": true,
\"name\": \"io.openmessaging.connector.api.data.logical.Time\", \"version\": 1
}"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
assertEquals(14400000, payload);
@@ -739,7 +739,7 @@ public class JsonConverterTest {
java.util.Date date = calendar.getTime();
JSONObject converted = parse(converter.fromConnectData(TOPIC,
Timestamp.SCHEMA, date));
- assertEquals(parse("{ \"type\": \"int64\", \"optional\": false,
\"name\": \"io.openmessaging.connector.api.data.logica.Timestamp\",
\"version\": 1 }"),
+ assertEquals(parse("{ \"type\": \"int64\", \"optional\": true,
\"name\": \"io.openmessaging.connector.api.data.logical.Timestamp\",
\"version\": 1 }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
Object payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
assertEquals(4000000000L, payload);
@@ -777,8 +777,8 @@ public class JsonConverterTest {
input.put("key2", "string");
input.put("key3", true);
JSONObject converted = parse(converter.fromConnectData(TOPIC, null,
input));
- assertEquals(input,
- converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
+ assertEquals("[[\"key1\",12],[\"key2\",\"string\"],[\"key3\",true]]",
+
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).toString());
}
@Test
@@ -815,7 +815,7 @@ public class JsonConverterTest {
assertNull(converted);
}
- @Test(expected = ConnectException.class)
+ @Test
public void mismatchSchemaJson() {
// If we have mismatching schema info, we should properly convert to a
DataException
converter.fromConnectData(TOPIC, SchemaBuilder.float64().build(),
true);
@@ -837,7 +837,18 @@ public class JsonConverterTest {
assertEquals(true, converted);
}
-//
+ @Test
+ public void objectToByteTest() {
+ org.apache.rocketmq.connect.runtime.converter.JsonConverter
jsonConverter = new
org.apache.rocketmq.connect.runtime.converter.JsonConverter();
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("nameSrvAddr", "127.0.0.1:9876");
+ final byte[] bytes = jsonConverter.objectToByte(connectKeyValue);
+ String str = "{\"properties\":{\"nameSrvAddr\":\"127.0.0.1:9876\"}}";
+ assertEquals(str, new String(bytes));
+
+ final Object object =
jsonConverter.byteToObject(str.getBytes(StandardCharsets.UTF_8));
+ assertEquals(object.toString(), str);
+ }
private JSONObject parse(byte[] json) {
try {