This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f941953774 Fix StarRocksJsonSerializer will transform array/map/row to
string (#5281)
f941953774 is described below
commit f941953774702fa86ad344fe8fbfd5fafbf32db7
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Aug 11 15:21:36 2023 +0800
Fix StarRocksJsonSerializer will transform array/map/row to string (#5281)
---
.../serialize/StarRocksJsonSerializer.java | 19 ++++++--
.../serialize/StarRocksJsonSerializerTest.java | 56 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
index 0e6de2f601..5bf15c533a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java
@@ -19,9 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.JsonUtils;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
public class StarRocksJsonSerializer extends StarRocksBaseSerializer
@@ -38,10 +39,22 @@ public class StarRocksJsonSerializer extends
StarRocksBaseSerializer
@Override
public String serialize(SeaTunnelRow row) {
- Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+ Map<String, Object> rowMap = new
LinkedHashMap<>(row.getFields().length);
for (int i = 0; i < row.getFields().length; i++) {
- Object value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ SqlType sqlType = seaTunnelRowType.getFieldType(i).getSqlType();
+ Object value;
+ if (sqlType == SqlType.ARRAY
+ || sqlType == SqlType.MAP
+ || sqlType == SqlType.ROW
+ || sqlType == SqlType.MULTIPLE_ROW) {
+ // If the field type is complex type, we should keep the
origin value.
+ // It will be transformed to json string in the next step
+ // JsonUtils.toJsonString(rowMap).
+ value = row.getField(i);
+ } else {
+ value = convert(seaTunnelRowType.getFieldType(i),
row.getField(i));
+ }
rowMap.put(seaTunnelRowType.getFieldName(i), value);
}
if (enableUpsertDelete) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
new file mode 100644
index 0000000000..6e0d947644
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+public class StarRocksJsonSerializerTest {
+
+ @Test
+ public void serialize() {
+ String[] filedNames = {"id", "name", "array", "map"};
+ SeaTunnelDataType<?>[] filedTypes = {
+ BasicType.LONG_TYPE,
+ BasicType.STRING_TYPE,
+ ArrayType.STRING_ARRAY_TYPE,
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)
+ };
+
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(filedNames,
filedTypes);
+ StarRocksJsonSerializer starRocksJsonSerializer =
+ new StarRocksJsonSerializer(seaTunnelRowType, false);
+ Object[] fields = {
+ 1, "Tom", new String[] {"tag1", "tag2"},
Collections.singletonMap("key1", "value1")
+ };
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ String jsonString = starRocksJsonSerializer.serialize(seaTunnelRow);
+ Assertions.assertEquals(
+
"{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"}}",
+ jsonString);
+ }
+}