This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 248d65d2de [INLONG-9098][Manager] Support to save additional info for
the Iceberg field (#9099)
248d65d2de is described below
commit 248d65d2de71bbca2dd13ee1db924c998fdad77c
Author: fuweng11 <[email protected]>
AuthorDate: Wed Oct 25 11:42:53 2023 +0800
[INLONG-9098][Manager] Support to save additional info for the Iceberg
field (#9099)
---
.../pojo/sink/iceberg/IcebergColumnInfo.java | 20 ++++---
.../resource/sink/iceberg/IcebergCatalogUtils.java | 49 ++++++++--------
.../sink/iceberg/IcebergResourceOperator.java | 6 +-
.../service/sink/iceberg/IcebergSinkOperator.java | 67 +++++++++++++++++++++-
.../source/iceberg/IcebergSourceOperator.java | 6 +-
5 files changed, 110 insertions(+), 38 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
index 10f3e6684b..b9ac57c07c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/iceberg/IcebergColumnInfo.java
@@ -17,13 +17,16 @@
package org.apache.inlong.manager.pojo.sink.iceberg;
+import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
@@ -32,10 +35,10 @@ import org.apache.commons.lang3.StringUtils;
* Iceberg column info
*/
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class IcebergColumnInfo {
+@JsonTypeDefine(value = SinkType.ICEBERG)
+public class IcebergColumnInfo extends SinkField {
@ApiModelProperty("Length of fixed type")
private Integer length;
@@ -55,12 +58,15 @@ public class IcebergColumnInfo {
@ApiModelProperty("Width param of truncate partition")
private Integer width;
- // The following are passed from base field and need not be part of API
for extra param
- private String name;
- private String type;
- private String desc;
private boolean required;
+ /**
+ * Get the dto instance from the request
+ */
+ public static IcebergColumnInfo getFromRequest(SinkField sinkField) {
+ return CommonBeanUtils.copyProperties(sinkField,
IcebergColumnInfo::new, true);
+ }
+
/**
* Get the extra param from the Json
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
index 3a6847fed0..1cd10e12c0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergCatalogUtils.java
@@ -95,10 +95,10 @@ public class IcebergCatalogUtils {
int id = 1;
for (IcebergColumnInfo column : tableInfo.getColumns()) {
if (column.isRequired()) {
- nestedFields.add(Types.NestedField.required(id,
column.getName(),
+ nestedFields.add(Types.NestedField.required(id,
column.getFieldName(),
Types.fromPrimitiveString(icebergTypeDesc(column))));
} else {
- nestedFields.add(Types.NestedField.optional(id,
column.getName(),
+ nestedFields.add(Types.NestedField.optional(id,
column.getFieldName(),
Types.fromPrimitiveString(icebergTypeDesc(column))));
}
id += 1;
@@ -118,14 +118,14 @@ public class IcebergCatalogUtils {
* Transform to iceberg recognizable type description
*/
private static String icebergTypeDesc(IcebergColumnInfo column) {
- switch (IcebergType.forType(column.getType())) {
+ switch (IcebergType.forType(column.getFieldType())) {
case DECIMAL:
// note: the space is needed or iceberg won't recognize
return String.format("decimal(%d, %d)", column.getPrecision(),
column.getScale());
case FIXED:
return String.format("fixed(%d)", column.getLength());
default:
- return column.getType();
+ return column.getFieldType();
}
}
@@ -147,9 +147,9 @@ public class IcebergCatalogUtils {
Schema schema = table.schema();
for (NestedField column : schema.columns()) {
IcebergColumnInfo info = new IcebergColumnInfo();
- info.setName(column.name());
+ info.setFieldName(column.name());
info.setRequired(column.isRequired());
- info.setType(column.type().toString());
+ info.setFieldType(column.type().toString());
columnList.add(info);
}
return columnList;
@@ -167,11 +167,12 @@ public class IcebergCatalogUtils {
UpdateSchema updateSchema = table.updateSchema();
for (IcebergColumnInfo column : columns) {
if (column.isRequired()) {
- updateSchema.addRequiredColumn(column.getName(),
Types.fromPrimitiveString(icebergTypeDesc(column)),
- column.getDesc());
+ updateSchema.addRequiredColumn(column.getFieldName(),
+ Types.fromPrimitiveString(icebergTypeDesc(column)),
+ column.getFieldComment());
} else {
- updateSchema.addColumn(column.getName(),
Types.fromPrimitiveString(icebergTypeDesc(column)),
- column.getDesc());
+ updateSchema.addColumn(column.getFieldName(),
Types.fromPrimitiveString(icebergTypeDesc(column)),
+ column.getFieldComment());
}
}
@@ -202,25 +203,25 @@ public class IcebergCatalogUtils {
}
switch (IcebergPartition.forName(column.getPartitionStrategy())) {
case IDENTITY:
- builder.identity(column.getName());
+ builder.identity(column.getFieldName());
break;
case BUCKET:
- builder.bucket(column.getName(), column.getBucketNum());
+ builder.bucket(column.getFieldName(), column.getBucketNum());
break;
case TRUNCATE:
- builder.truncate(column.getName(), column.getWidth());
+ builder.truncate(column.getFieldName(), column.getWidth());
break;
case YEAR:
- builder.year(column.getName());
+ builder.year(column.getFieldName());
break;
case MONTH:
- builder.month(column.getName());
+ builder.month(column.getFieldName());
break;
case DAY:
- builder.day(column.getName());
+ builder.day(column.getFieldName());
break;
case HOUR:
- builder.hour(column.getName());
+ builder.hour(column.getFieldName());
break;
case NONE:
break;
@@ -241,25 +242,25 @@ public class IcebergCatalogUtils {
}
switch (IcebergPartition.forName(column.getPartitionStrategy())) {
case IDENTITY:
- builder.addField(column.getName());
+ builder.addField(column.getFieldName());
break;
case BUCKET:
- builder.addField(Expressions.bucket(column.getName(),
column.getBucketNum()));
+ builder.addField(Expressions.bucket(column.getFieldName(),
column.getBucketNum()));
break;
case TRUNCATE:
- builder.addField(Expressions.truncate(column.getName(),
column.getWidth()));
+ builder.addField(Expressions.truncate(column.getFieldName(),
column.getWidth()));
break;
case YEAR:
- builder.addField(Expressions.year(column.getName()));
+ builder.addField(Expressions.year(column.getFieldName()));
break;
case MONTH:
- builder.addField(Expressions.month(column.getName()));
+ builder.addField(Expressions.month(column.getFieldName()));
break;
case DAY:
- builder.addField(Expressions.day(column.getName()));
+ builder.addField(Expressions.day(column.getFieldName()));
break;
case HOUR:
- builder.addField(Expressions.hour(column.getName()));
+ builder.addField(Expressions.hour(column.getFieldName()));
break;
case NONE:
break;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
index e28d1204a2..68c16813ad 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
@@ -160,9 +160,9 @@ public class IcebergResourceOperator implements
SinkResourceOperator {
List<IcebergColumnInfo> columnList = new ArrayList<>();
for (StreamSinkFieldEntity field : fieldList) {
IcebergColumnInfo column =
IcebergColumnInfo.getFromJson(field.getExtParams());
- column.setName(field.getFieldName());
- column.setType(field.getFieldType());
- column.setDesc(field.getFieldComment());
+ column.setFieldName(field.getFieldName());
+ column.setFieldType(field.getFieldType());
+ column.setFieldComment(field.getFieldComment());
column.setRequired(field.getIsRequired() != null &&
field.getIsRequired() > 0);
columnList.add(column);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index eb01dbb981..4001b79981 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -17,12 +17,14 @@
package org.apache.inlong.manager.service.sink.iceberg;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -34,12 +36,14 @@ import
org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkRequest;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -102,7 +106,7 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
- List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ List<SinkField> sinkFields = getSinkFields(entity.getId());
sink.setSinkFieldList(sinkFields);
return sink;
}
@@ -127,4 +131,65 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
}
}
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkField> fieldList = request.getSinkFieldList();
+ LOGGER.info("begin to save es sink fields={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkField fieldInfo : fieldList) {
+ this.checkFieldInfo(fieldInfo);
+ StreamSinkFieldEntity fieldEntity =
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ try {
+ IcebergColumnInfo dto =
IcebergColumnInfo.getFromRequest(fieldInfo);
+ fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("parsing json string to sink field info failed",
e);
+ throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save es sink fields");
+ }
+
+ @Override
+ public List<SinkField> getSinkFields(Integer sinkId) {
+ List<StreamSinkFieldEntity> sinkFieldEntities =
sinkFieldMapper.selectBySinkId(sinkId);
+ List<SinkField> fieldList = new ArrayList<>();
+ if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+ return fieldList;
+ }
+ sinkFieldEntities.forEach(field -> {
+ SinkField sinkField = new SinkField();
+ if (StringUtils.isNotBlank(field.getExtParams())) {
+ IcebergColumnInfo icebergColumnInfo =
IcebergColumnInfo.getFromJson(
+ field.getExtParams());
+ CommonBeanUtils.copyProperties(field, icebergColumnInfo, true);
+ fieldList.add(icebergColumnInfo);
+ } else {
+ CommonBeanUtils.copyProperties(field, sinkField, true);
+ fieldList.add(sinkField);
+ }
+
+ });
+ return fieldList;
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
index cb29ffd6ef..d594ab26c0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
@@ -112,9 +112,9 @@ public class IcebergSourceOperator extends
AbstractSourceOperator {
List<IcebergColumnInfo> existColumns =
IcebergCatalogUtils.getColumns(metastoreUri, dbName, tableName);
for (IcebergColumnInfo columnInfo : existColumns) {
StreamField streamField = new StreamField();
- streamField.setFieldName(columnInfo.getName());
-
streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getType()));
- streamField.setFieldComment(columnInfo.getDesc());
+ streamField.setFieldName(columnInfo.getFieldName());
+
streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getFieldType()));
+ streamField.setFieldComment(columnInfo.getFieldComment());
streamFields.add(streamField);
}
updateField(sourceRequest.getInlongGroupId(),
sourceRequest.getInlongStreamId(), streamFields);