This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e6ec72c684 [INLONG-10512][Manager] Support preview of data in kv data
type (#10513)
e6ec72c684 is described below
commit e6ec72c684ae51656014b9e38f83cacc62a1f680
Author: fuweng11 <[email protected]>
AuthorDate: Wed Jun 26 14:07:21 2024 +0800
[INLONG-10512][Manager] Support preview of data in kv data type (#10513)
* [INLONG-10512][Manager] Support preview of data in kv data type
---
.../manager/pojo/stream/BaseInlongStream.java | 2 +
.../manager/pojo/stream/InlongStreamExtParam.java | 6 ++
.../service/datatype/KvDataTypeOperator.java | 100 +++++++++++++++++++++
3 files changed, 108 insertions(+)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
index 4abd2f836f..d992b8141f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/BaseInlongStream.java
@@ -33,5 +33,7 @@ public class BaseInlongStream {
// you can add extend parameters in this class
private String predefinedFields;
+ private String kvSeparator;
+ private String lineSeparator;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
index 3358ab91eb..1af56c2769 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/InlongStreamExtParam.java
@@ -45,6 +45,12 @@ public class InlongStreamExtParam implements Serializable {
@ApiModelProperty(value = "Whether to ignore the parse errors of field
value")
private Boolean ignoreParseError;
+ @ApiModelProperty(value = "Kv separator")
+ private String kvSeparator;
+
+ @ApiModelProperty(value = "Line separator")
+ private String lineSeparator;
+
@ApiModelProperty(value = "If use extended fields")
private Boolean useExtendedFields = false;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
new file mode 100644
index 0000000000..79177f3d70
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Slf4j
+@Service
+public class KvDataTypeOperator implements DataTypeOperator {
+
+ @Override
+ public boolean accept(DataTypeEnum type) {
+ return DataTypeEnum.KV.equals(type);
+ }
+
+ @Override
+ public List<FieldInfo> parseFields(String str, InlongStreamInfo
streamInfo) throws Exception {
+ List<FieldInfo> fields =
CommonBeanUtils.copyListProperties(streamInfo.getFieldList(), FieldInfo::new);
+ try {
+ char separator = 0;
+ if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+ separator = (char)
Integer.parseInt(streamInfo.getDataSeparator());
+ }
+ char kvSeparator = '=';
+ if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+ kvSeparator = (char)
Integer.parseInt(streamInfo.getKvSeparator());
+ }
+ String[] bodys = StringUtils.split(str, separator);
+ if (bodys.length != fields.size()) {
+ log.warn(
+ "The number of reported fields does not match the
number of stream fields for groupId={}, streamId={}, reported field size ={},
stream field size ={}",
+ streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId(), bodys.length, fields.size());
+ return fields;
+ }
+ for (int i = 0; i < bodys.length; i++) {
+ String body = bodys[i];
+ String[] values = StringUtils.split(body, kvSeparator);
+ fields.get(i).setFieldName(values[0]);
+ fields.get(i).setFieldValue(values[1]);
+ }
+ } catch (Exception e) {
+ log.warn("parse fields failed for groupId = {}, streamId = {}",
streamInfo.getInlongGroupId(),
+ streamInfo.getInlongStreamId(), e);
+ }
+ return fields;
+ }
+
+ @Override
+ public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
+ char separator = 0;
+ if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+ separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
+ }
+ Character escape = null;
+ if (streamInfo.getDataEscapeChar() != null) {
+ escape = streamInfo.getDataEscapeChar().charAt(0);
+ }
+ KvConfig kvConfig = new KvConfig();
+ char kvSeparator = '=';
+ Character lineSeparator = null;
+ if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
+ kvSeparator = (char) Integer.parseInt(streamInfo.getKvSeparator());
+ }
+ // row separator, which must be a field separator in the data flow
+ if (StringUtils.isNotBlank(streamInfo.getLineSeparator())) {
+ lineSeparator = (char)
Integer.parseInt(streamInfo.getLineSeparator());
+ }
+ kvConfig.setLineSeparator(lineSeparator);
+ kvConfig.setKvSplitter(kvSeparator);
+ kvConfig.setEntrySplitter(separator);
+ kvConfig.setEscapeChar(escape);
+ return kvConfig;
+ }
+}