This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73945a3f3 [INLONG-7690][Manager] Creating schema of StreamSource by
CSV (#7740)
73945a3f3 is described below
commit 73945a3f37162c3e121f52b1a0d33977bf957fff
Author: feat <[email protected]>
AuthorDate: Mon Apr 3 15:51:25 2023 +0800
[INLONG-7690][Manager] Creating schema of StreamSource by CSV (#7740)
---
.../manager/common/consts/InlongConstants.java | 13 ++++
.../service/sink/StreamSinkServiceImpl.java | 44 ++++++++++++-
.../service/stream/InlongStreamServiceImpl.java | 74 ++++++++++++++++++----
3 files changed, 118 insertions(+), 13 deletions(-)
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 708042fca..267edd393 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -17,6 +17,11 @@
package org.apache.inlong.manager.common.consts;
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+
/**
* Global constant for the Inlong system.
*/
@@ -49,6 +54,8 @@ public class InlongConstants {
public static final String QUESTION_MARK = "?";
+ public static final String NEW_LINE = "\n";
+
public static final String ADMIN_USER = "admin";
public static final Integer AFFECTED_ONE_ROW = 1;
@@ -150,7 +157,13 @@ public class InlongConstants {
public static final String STATEMENT_TYPE_SQL = "sql";
public static final String STATEMENT_TYPE_JSON = "json";
+ public static final String STATEMENT_TYPE_CSV = "csv";
public static final String SORT_TYPE_INFO_SUFFIX = "TypeInfo";
+ public static final Pattern PATTERN_NORMAL_CHARACTERS =
Pattern.compile("^[a-zA-Z0-9_]*$");
+
+ public static final Set<String> STREAM_FORMAT_TYPES =
+ Sets.newHashSet("string", "int", "long", "float", "double",
"date", "timestamp");
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index e11532de7..5db13d989 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -82,7 +82,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.manager.common.consts.InlongConstants.LEFT_BRACKET;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
/**
* Implementation of sink service interface
@@ -91,6 +93,9 @@ import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_
public class StreamSinkServiceImpl implements StreamSinkService {
private static final Logger LOGGER =
LoggerFactory.getLogger(StreamSinkServiceImpl.class);
+ private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+ private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+ private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
@Autowired
private SinkOperatorFactory operatorFactory;
@@ -708,8 +713,10 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
- } else {
+ } else if (STATEMENT_TYPE_SQL.equals(method)) {
fieldsMap = parseFieldsBySql(statement);
+ } else {
+ return parseFieldsByCsv(statement);
}
return fieldsMap.entrySet().stream().map(entry -> {
SinkField field = new SinkField();
@@ -725,6 +732,41 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
}
}
+ private List<SinkField> parseFieldsByCsv(String statement) {
+ String[] lines = statement.split(InlongConstants.NEW_LINE);
+ List<SinkField> fields = new ArrayList<>();
+ for (int i = 0; i < lines.length; i++) {
+ String line = lines[i];
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+
+ String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER,
PARSE_FIELD_CSV_MAX_COLUMNS);
+ if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "At least two fields are required, line number is " +
(i + 1));
+ }
+ String fieldName = cols[0];
+ if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Field names in line " + (i + 1) +
+ " can only contain letters, underscores or numbers");
+ }
+ String fieldType = cols[1];
+
+ String comment = null;
+ if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+ comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+ }
+
+ SinkField field = new SinkField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldType);
+ field.setFieldComment(comment);
+ fields.add(field);
+ }
+ return fields;
+ }
+
private Map<String, String> parseFieldsBySql(String sql) throws
JSQLParserException {
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index e50a4c4be..2f19c6bb7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -82,7 +82,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NORMAL_CHARACTERS;
import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.STREAM_FORMAT_TYPES;
import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.packExtParams;
import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackExtParams;
@@ -93,6 +96,9 @@ import static
org.apache.inlong.manager.pojo.stream.InlongStreamExtParam.unpackE
public class InlongStreamServiceImpl implements InlongStreamService {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongStreamServiceImpl.class);
+ private static final String PARSE_FIELD_CSV_SPLITTER = "\t|\\s|,";
+ private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
+ private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
@Autowired
private InlongStreamEntityMapper streamMapper;
@@ -112,7 +118,6 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
private UserService userService;
@Transactional(rollbackFor = Throwable.class)
-
@Override
public Integer save(InlongStreamRequest request, String operator) {
LOGGER.debug("begin to save inlong stream info={}", request);
@@ -222,8 +227,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
streamInfo.setFieldList(streamFields);
// load ext infos
List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
- List<InlongStreamExtInfo> exts =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
- streamInfo.setExtList(exts);
+ List<InlongStreamExtInfo> extInfos =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+ streamInfo.setExtList(extInfos);
// load extParams
unpackExtParams(streamEntity.getExtParams(), streamInfo);
@@ -255,8 +260,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
List<StreamField> streamFields = getStreamFields(groupId, streamId);
streamInfo.setFieldList(streamFields);
List<InlongStreamExtEntity> extEntities =
streamExtMapper.selectByRelatedId(groupId, streamId);
- List<InlongStreamExtInfo> exts =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
- streamInfo.setExtList(exts);
+ List<InlongStreamExtInfo> extInfos =
CommonBeanUtils.copyListProperties(extEntities, InlongStreamExtInfo::new);
+ streamInfo.setExtList(extInfos);
List<StreamSink> sinkList = sinkService.listSink(groupId, streamId);
streamInfo.setSinkList(sinkList);
List<StreamSource> sourceList = sourceService.listSource(groupId,
streamId);
@@ -733,8 +738,10 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
Map<String, String> fieldsMap;
if (STATEMENT_TYPE_JSON.equals(method)) {
fieldsMap = parseFieldsByJson(statement);
- } else {
+ } else if (STATEMENT_TYPE_SQL.equals(method)) {
fieldsMap = parseFieldsBySql(statement);
+ } else {
+ return parseFieldsByCsv(statement);
}
return fieldsMap.entrySet().stream().map(entry -> {
StreamField field = new StreamField();
@@ -749,7 +756,49 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
String.format("parse stream fields error : %s",
e.getMessage()));
}
}
+ /**
+ * Parse fields from CSV format
+ * @param statement CSV statement
+ * @return List of StreamField
+ */
+ private List<StreamField> parseFieldsByCsv(String statement) {
+ String[] lines = statement.split(InlongConstants.NEW_LINE);
+ List<StreamField> fields = new ArrayList<>();
+ for (int i = 0; i < lines.length; i++) {
+ String line = lines[i];
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+ String[] cols = line.split(PARSE_FIELD_CSV_SPLITTER,
PARSE_FIELD_CSV_MAX_COLUMNS);
+ if (cols.length < PARSE_FIELD_CSV_MIN_COLUMNS) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "At least two fields are required, line number is " +
(i + 1));
+ }
+ String fieldName = cols[0];
+ if (!PATTERN_NORMAL_CHARACTERS.matcher(fieldName).matches()) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"Field names in line " + (i + 1) +
+ " can only contain letters, underscores or numbers");
+ }
+ String fieldType = cols[1];
+ if (!STREAM_FORMAT_TYPES.contains(fieldType)) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
"The field type in line" + (i + 1) +
+ " must be one of " + STREAM_FORMAT_TYPES);
+ }
+
+ String comment = null;
+ if (cols.length == PARSE_FIELD_CSV_MAX_COLUMNS) {
+ comment = cols[PARSE_FIELD_CSV_MAX_COLUMNS - 1];
+ }
+
+ StreamField field = new StreamField();
+ field.setFieldName(fieldName);
+ field.setFieldType(fieldType);
+ field.setFieldComment(comment);
+ fields.add(field);
+ }
+ return fields;
+ }
private Map<String, String> parseFieldsBySql(String sql) throws
JSQLParserException {
CCJSqlParserManager pm = new CCJSqlParserManager();
Statement statement = pm.parse(new StringReader(sql));
@@ -793,7 +842,7 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
* First physically delete the existing field information, and then add
the field information of this batch
*/
@Transactional(rollbackFor = Throwable.class)
- void updateField(String groupId, String streamId, List<StreamField>
fieldList) {
+ public void updateField(String groupId, String streamId, List<StreamField>
fieldList) {
LOGGER.debug("begin to update inlong stream field, groupId={},
streamId={}, field={}", groupId, streamId,
fieldList);
try {
@@ -807,7 +856,7 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
}
@Transactional(rollbackFor = Throwable.class)
- void saveField(String groupId, String streamId, List<StreamField>
infoList) {
+ public void saveField(String groupId, String streamId, List<StreamField>
infoList) {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
@@ -823,14 +872,15 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
}
@Transactional(rollbackFor = Throwable.class)
- void saveOrUpdateExt(String groupId, String streamId,
List<InlongStreamExtInfo> exts) {
+ public void saveOrUpdateExt(String groupId, String streamId,
List<InlongStreamExtInfo> extInfos) {
LOGGER.info("begin to save or update inlong stream ext info,
groupId={}, streamId={}, ext={}", groupId,
- streamId, exts);
- if (CollectionUtils.isEmpty(exts)) {
+ streamId, extInfos);
+ if (CollectionUtils.isEmpty(extInfos)) {
return;
}
- List<InlongStreamExtEntity> entityList =
CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
+ List<InlongStreamExtEntity> entityList =
+ CommonBeanUtils.copyListProperties(extInfos,
InlongStreamExtEntity::new);
entityList.forEach(streamEntity -> {
streamEntity.setInlongGroupId(groupId);
streamEntity.setInlongStreamId(streamId);