This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dba59007b6 [INLONG-9962][Manager] Data preview supports returning
header and specific field information (#9967)
dba59007b6 is described below
commit dba59007b6641d31d10a6157a8e07f628082a8f7
Author: fuweng11 <[email protected]>
AuthorDate: Wed Apr 10 21:39:37 2024 +0800
[INLONG-9962][Manager] Data preview supports returning header and specific
field information (#9967)
* [INLONG-9962][Manager] Data preview supports returning header and
specific field information
* [INLONG-9962][Manager] Data preview supports returning header and
specific field information
* [INLONG-9962][Manager] Fix error
* [INLONG-9962][Manager] Fix error
* [INLONG-9962][Manager] Fix error
---
.../apache/inlong/common/enums/DataTypeEnum.java | 20 ++++++-
.../dataproxy/config/holder/MetaConfigHolder.java | 6 +-
.../dataproxy/config/pojo/IdTopicConfig.java | 7 ++-
.../manager/pojo/consume/BriefMQMessage.java | 11 ++++
.../service/datatype/CsvDataTypeOperator.java | 63 +++++++++++++++++++
.../manager/service/datatype/DataTypeOperator.java | 56 +++++------------
.../service/datatype/DataTypeOperatorFactory.java | 49 +++++++++++++++
.../message/InlongMsgDeserializeOperator.java | 33 +++++++++-
.../service/message/RawMsgDeserializeOperator.java | 32 +++++++++-
.../resource/queue/kafka/KafkaOperator.java | 10 ++--
.../resource/queue/pulsar/PulsarOperator.java | 4 ++
.../resource/queue/tubemq/TubeMQOperator.java | 10 ++--
.../resource/queue/kafka/KafkaOperatorTest.java | 4 ++
.../sort/standalone/config/pojo/type/DataType.java | 70 ----------------------
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 12 ++--
.../standalone/sink/pulsar/PulsarIdConfig.java | 12 ++--
16 files changed, 256 insertions(+), 143 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 2d2d8d1997..f327508660 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -30,8 +30,10 @@ public enum DataTypeEnum {
CANAL("canal"),
DEBEZIUM_JSON("debezium_json"),
RAW("raw"),
-
- ;
+ TEXT("text"),
+ PB("pb"),
+ JCE("jce"),
+ UNKNOWN("n");
private final String type;
@@ -48,6 +50,20 @@ public enum DataTypeEnum {
throw new IllegalArgumentException("Unsupported data type for " +
type);
}
+ public static DataTypeEnum convert(String value) {
+ for (DataTypeEnum v : values()) {
+ if (v.getType().equals(value)) {
+ return v;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ @Override
+ public String toString() {
+ return this.name() + ":" + this.type;
+ }
+
public String getType() {
return type;
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
index 369e7b2212..96c5bd820a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/MetaConfigHolder.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.config.holder;
import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
@@ -29,7 +30,6 @@ import org.apache.inlong.dataproxy.config.ConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.CacheType;
-import org.apache.inlong.dataproxy.config.pojo.DataType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.sdk.commons.protocol.InlongId;
@@ -407,8 +407,8 @@ public class MetaConfigHolder extends ConfigHolder {
tmpConfig.setTenantAndNameSpace(tenant, nameSpace);
tmpConfig.setTopicName(topicName);
tmpConfig.setParams(idObject.getParams());
- tmpConfig.setDataType(DataType.convert(
- idObject.getParams().getOrDefault("dataType",
DataType.TEXT.value())));
+ tmpConfig.setDataType(DataTypeEnum.convert(
+ idObject.getParams().getOrDefault("dataType",
DataTypeEnum.TEXT.getType())));
tmpConfig.setFieldDelimiter(idObject.getParams().getOrDefault("fieldDelimiter",
"|"));
tmpConfig.setFileDelimiter(idObject.getParams().getOrDefault("fileDelimiter",
"\n"));
tmpConfig.setUseExtendedFields(Boolean.valueOf(
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
index 31646fd3b5..48b19d446d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.config.pojo;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.apache.commons.lang.StringUtils;
@@ -37,7 +38,7 @@ public class IdTopicConfig {
private String topicName;
private String tenant;
private String nameSpace;
- private DataType dataType = DataType.TEXT;
+ private DataTypeEnum dataType = DataTypeEnum.TEXT;
private String fieldDelimiter = "|";
private String fileDelimiter = "\n";
private Boolean useExtendedFields = false;
@@ -142,7 +143,7 @@ public class IdTopicConfig {
* get dataType
* @return the dataType
*/
- public DataType getDataType() {
+ public DataTypeEnum getDataType() {
return dataType;
}
@@ -150,7 +151,7 @@ public class IdTopicConfig {
* set dataType
* @param dataType the dataType to set
*/
- public void setDataType(DataType dataType) {
+ public void setDataType(DataTypeEnum dataType) {
this.dataType = dataType;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index 51085c5570..72dd4f6303 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.pojo.consume;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
@@ -24,6 +26,9 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.util.List;
+import java.util.Map;
+
/**
* Brief Message info for MQ
*/
@@ -49,7 +54,13 @@ public class BriefMQMessage {
@ApiModelProperty(value = "Client ip")
private String clientIp;
+ @ApiModelProperty(value = "Message header")
+ private Map<String, String> headers;
+
@ApiModelProperty(value = "Message body")
private String body;
+ @ApiModelProperty(value = "List of field info")
+ private List<StreamField> fieldList;
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
new file mode 100644
index 0000000000..5bb0b969f6
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Slf4j
+@Service
+public class CsvDataTypeOperator implements DataTypeOperator {
+
+ @Override
+ public boolean accept(DataTypeEnum type) {
+ return DataTypeEnum.CSV.equals(type);
+ }
+
+ @Override
+ public List<StreamField> parseFields(String str, InlongStreamInfo
streamInfo) throws Exception {
+ List<StreamField> streamFields =
CommonBeanUtils.copyListProperties(streamInfo.getFieldList(),
+ StreamField::new);
+ try {
+ char separator = '|';
+ if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
+ separator = (char)
Integer.parseInt(streamInfo.getDataSeparator());
+ }
+ String[] bodys = StringUtils.split(str, separator);
+ if (bodys.length != streamFields.size()) {
+ return streamFields;
+ }
+ for (int i = 0; i < bodys.length; i++) {
+ streamFields.get(i).setFieldValue(bodys[i]);
+ }
+ return streamFields;
+ } catch (Exception e) {
+ log.warn("parse fields failed for groupId = {}, streamId = {}",
streamInfo.getInlongGroupId(),
+ streamInfo.getInlongStreamId(), e);
+ }
+ return streamFields;
+ }
+}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
similarity index 51%
rename from
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
rename to
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
index 18491e7343..f2e42b2171 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java
@@ -15,56 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.config.pojo;
+package org.apache.inlong.manager.service.datatype;
-/**
- * data content type
- */
-public enum DataType {
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
- TEXT("text"), PB("pb"), JCE("jce"), N("n"), CSV("csv"), KV("kv");
+import java.util.List;
- private final String value;
+/**
+ * Data type operator
+ */
+public interface DataTypeOperator {
/**
- *
- * Constructor
- *
- * @param value
+ * Determines whether the current instance matches the specified type.
*/
- private DataType(String value) {
- this.value = value;
- }
+ boolean accept(DataTypeEnum type);
/**
- * value
+ * Parse fields from message
*
- * @return
+ * @param streamInfo inlong stream info
+ * @return list of field info
*/
- public String value() {
- return this.value;
+ default List<StreamField> parseFields(String message, InlongStreamInfo
streamInfo) throws Exception {
+ return streamInfo.getFieldList();
}
- /**
- * toString
- */
- @Override
- public String toString() {
- return this.name() + ":" + this.value;
- }
-
- /**
- * convert
- *
- * @param value
- * @return
- */
- public static DataType convert(String value) {
- for (DataType v : values()) {
- if (v.value().equals(value)) {
- return v;
- }
- }
- return N;
- }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
new file mode 100644
index 0000000000..8b36803ffb
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperatorFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.datatype;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link DataTypeOperator}.
+ */
+@Service
+public class DataTypeOperatorFactory {
+
+ @Autowired
+ private List<DataTypeOperator> operatorList;
+
+ /**
+ * Get a data type operator instance via the given data type
+ */
+ public DataTypeOperator getInstance(DataTypeEnum type) {
+ return operatorList.stream()
+ .filter(inst -> inst.accept(type))
+ .findFirst()
+ .orElseThrow(() -> new
BusinessException(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type)));
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index 0e1b2a0a56..16a268428d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -17,14 +17,20 @@
package org.apache.inlong.manager.service.message;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.util.StringUtil;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
@@ -38,6 +44,9 @@ import java.util.Objects;
@Service
public class InlongMsgDeserializeOperator implements DeserializeOperator {
+ @Autowired
+ public DataTypeOperatorFactory dataTypeOperatorFactory;
+
@Override
public boolean accept(MessageWrapType type) {
return MessageWrapType.INLONG_MSG_V0.equals(type);
@@ -71,9 +80,27 @@ public class InlongMsgDeserializeOperator implements
DeserializeOperator {
if (Objects.isNull(bodyBytes)) {
continue;
}
- BriefMQMessage message = new BriefMQMessage(index, groupId,
streamId, msgTime, attrMap.get(CLIENT_IP),
- new String(bodyBytes,
Charset.forName(streamInfo.getDataEncoding())));
- messageList.add(message);
+ try {
+ String body = new String(bodyBytes,
Charset.forName(streamInfo.getDataEncoding()));
+ DataTypeOperator dataTypeOperator =
+
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+ List<StreamField> streamFieldList =
dataTypeOperator.parseFields(body, streamInfo);
+ BriefMQMessage message = BriefMQMessage.builder()
+ .id(index)
+ .inlongGroupId(groupId)
+ .inlongStreamId(streamId)
+ .dt(msgTime)
+ .clientIp(attrMap.get(CLIENT_IP))
+ .headers(headers)
+ .body(body)
+ .fieldList(streamFieldList)
+ .build();
+ messageList.add(message);
+ } catch (Exception e) {
+ String errMsg = String.format("decode msg failed for
groupId=%s, streamId=%s", groupId, streamId);
+ log.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
}
}
return messageList;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index 361afe46d6..6c6e2840e3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -17,12 +17,18 @@
package org.apache.inlong.manager.service.message;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.datatype.DataTypeOperator;
+import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
@@ -34,6 +40,9 @@ import java.util.Map;
@Service
public class RawMsgDeserializeOperator implements DeserializeOperator {
+ @Autowired
+ public DataTypeOperatorFactory dataTypeOperatorFactory;
+
@Override
public boolean accept(MessageWrapType type) {
return MessageWrapType.RAW.equals(type);
@@ -45,8 +54,27 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
- return Collections.singletonList(new BriefMQMessage(index, groupId,
streamId, msgTime,
- headers.get(CLIENT_IP), new String(msgBytes,
Charset.forName(streamInfo.getDataEncoding()))));
+ String body = new String(msgBytes,
Charset.forName(streamInfo.getDataEncoding()));
+ try {
+ DataTypeOperator dataTypeOperator =
+
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
+ List<StreamField> streamFieldList =
dataTypeOperator.parseFields(body, streamInfo);
+ BriefMQMessage briefMQMessage = BriefMQMessage.builder()
+ .id(index)
+ .inlongGroupId(groupId)
+ .inlongStreamId(streamId)
+ .dt(msgTime)
+ .clientIp(headers.get(CLIENT_IP))
+ .headers(headers)
+ .body(body)
+ .fieldList(streamFieldList)
+ .build();
+ return Collections.singletonList(briefMQMessage);
+ } catch (Exception e) {
+ String errMsg = String.format("decode msg failed for groupId=%s,
streamId=%s", groupId, streamId);
+ log.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index deb6601b63..763a654c26 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -156,10 +156,12 @@ public class KafkaOperator {
headers.put(header.key(), new String(header.value(),
StandardCharsets.UTF_8));
}
- int wrapTypeId =
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
- DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- MessageWrapType.valueOf(wrapTypeId));
+ MessageWrapType messageWrapType =
MessageWrapType.forType(streamInfo.getWrapType());
+ if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
+ messageWrapType =
+
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+ }
+ DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
record.value(), headers, index));
if (messageList.size() >= messageCount) {
break;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index ccabb716f1..45e6112ce3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -48,6 +48,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -453,6 +454,9 @@ public class PulsarOperator {
messagePosition);
PulsarMessageInfo messageInfo =
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
Map<String, String> headers = messageInfo.getProperties();
+ if (headers == null) {
+ headers = new HashMap<>();
+ }
MessageWrapType messageWrapType =
MessageWrapType.forType(streamInfo.getWrapType());
if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
messageWrapType =
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index e13b4cbe2c..cdbc4da86d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -303,11 +303,13 @@ public class TubeMQOperator {
map.put(kv.split(InlongConstants.EQUAL)[0],
kv.split(InlongConstants.EQUAL)[1]);
}
- int wrapTypeId =
Integer.parseInt(map.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
+ MessageWrapType messageWrapType =
MessageWrapType.forType(streamInfo.getWrapType());
+ if (map.get(InlongConstants.MSG_ENCODE_VER) != null) {
+ messageWrapType =
+
MessageWrapType.valueOf(Integer.parseInt(map.get(InlongConstants.MSG_ENCODE_VER)));
+ }
byte[] messageData =
Base64.getDecoder().decode(tubeDataInfo.getData());
- DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(
- MessageWrapType.valueOf(wrapTypeId));
+ DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
messageList.addAll(deserializeOperator.decodeMsg(streamInfo,
messageData, map, index));
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
index d2cf618b3d..acc6ebb3f5 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.enums.MessageWrapType;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -93,6 +95,8 @@ public class KafkaOperatorTest extends ServiceBaseTest {
@BeforeEach
public void setUp() {
streamInfo.setDataEncoding("UTF-8");
+ streamInfo.setDataType(DataTypeEnum.CSV.getType());
+ streamInfo.setWrapType(MessageWrapType.INLONG_MSG_V0.getName());
List<TopicPartition> topicPartitions = IntStream.range(0,
PARTITION_NUM)
.mapToObj(i -> new TopicPartition(TOPIC_NAME,
i)).collect(Collectors.toList());
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
deleted file mode 100644
index 61a1a4ac54..0000000000
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.config.pojo.type;
-
-/**
- * data content type
- */
-public enum DataType {
-
- TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n");
-
- private final String value;
-
- /**
- *
- * Constructor
- *
- * @param value
- */
- private DataType(String value) {
- this.value = value;
- }
-
- /**
- * value
- *
- * @return
- */
- public String value() {
- return this.value;
- }
-
- /**
- * toString
- */
- @Override
- public String toString() {
- return this.name() + ":" + this.value;
- }
-
- /**
- * convert
- *
- * @param value
- * @return
- */
- public static DataType convert(String value) {
- for (DataType v : values()) {
- if (v.value().equals(value)) {
- return v;
- }
- }
- return UNKNOWN;
- }
-}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 3a527b23be..73d58afde0 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.kafka;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.type.DataType;
import org.apache.inlong.sort.standalone.utils.Constants;
import java.util.Map;
@@ -38,7 +38,7 @@ public class KafkaIdConfig {
private String uid;
private String separator = "|";
private String topic;
- private DataType dataType = DataType.TEXT;
+ private DataTypeEnum dataType = DataTypeEnum.TEXT;
/**
* Constructor
@@ -58,8 +58,8 @@ public class KafkaIdConfig {
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.separator = idParam.getOrDefault(KafkaIdConfig.KEY_SEPARATOR,
KafkaIdConfig.DEFAULT_SEPARATOR);
this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
- this.dataType = DataType
- .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE,
DataType.TEXT.value()));
+ this.dataType = DataTypeEnum
+ .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
}
/**
@@ -157,7 +157,7 @@ public class KafkaIdConfig {
*
* @return the dataType
*/
- public DataType getDataType() {
+ public DataTypeEnum getDataType() {
return dataType;
}
@@ -166,7 +166,7 @@ public class KafkaIdConfig {
*
* @param dataType the dataType to set
*/
- public void setDataType(DataType dataType) {
+ public void setDataType(DataTypeEnum dataType) {
this.dataType = dataType;
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 52377d2461..4ef3aece80 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
+import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.config.pojo.type.DataType;
import org.apache.inlong.sort.standalone.utils.Constants;
import java.util.Map;
@@ -40,7 +40,7 @@ public class PulsarIdConfig {
private String uid;
private String separator = "|";
private String topic;
- private DataType dataType = DataType.TEXT;
+ private DataTypeEnum dataType = DataTypeEnum.TEXT;
/**
* Constructor
@@ -60,8 +60,8 @@ public class PulsarIdConfig {
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR,
PulsarIdConfig.DEFAULT_SEPARATOR);
this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
- this.dataType = DataType
- .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE,
DataType.TEXT.value()));
+ this.dataType = DataTypeEnum
+ .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
}
/**
@@ -159,7 +159,7 @@ public class PulsarIdConfig {
*
* @return the dataType
*/
- public DataType getDataType() {
+ public DataTypeEnum getDataType() {
return dataType;
}
@@ -168,7 +168,7 @@ public class PulsarIdConfig {
*
* @param dataType the dataType to set
*/
- public void setDataType(DataType dataType) {
+ public void setDataType(DataTypeEnum dataType) {
this.dataType = dataType;
}