This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b136bc71fe [INLONG-8815][Manager] Supports configuring iceberg
streamSources (#8816)
b136bc71fe is described below
commit b136bc71fefd505118743424824cb17cf50990d4
Author: fuweng11 <[email protected]>
AuthorDate: Mon Sep 11 16:08:00 2023 +0800
[INLONG-8815][Manager] Supports configuring iceberg streamSources (#8816)
---
inlong-audit/audit-store/pom.xml | 8 ++
.../inlong/manager/common/consts/SinkType.java | 1 -
.../inlong/manager/common/consts/StreamType.java | 2 +
inlong-manager/manager-dao/pom.xml | 8 ++
.../pojo/sort/node/ExtractNodeProviderFactory.java | 2 +
.../pojo/sort/node/provider/IcebergProvider.java | 31 +++++++-
.../manager/pojo/source/iceberg/IcebergSource.java | 70 ++++++++++++++++++
.../pojo/source/iceberg/IcebergSourceDTO.java | 80 ++++++++++++++++++++
.../pojo/source/iceberg/IcebergSourceRequest.java | 59 +++++++++++++++
.../source/iceberg/IcebergSourceOperator.java | 86 ++++++++++++++++++++++
.../inlong/sort/protocol/node/ExtractNode.java | 4 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 4 +-
...bergExtracNode.java => IcebergExtractNode.java} | 4 +-
.../parser/Iceberg2StarRocksSqlParserTest.java | 6 +-
14 files changed, 352 insertions(+), 13 deletions(-)
diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 1ec2947d2d..45bbb66a97 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -115,6 +115,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>jconsole</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index be60980f4c..f11e9daf6d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -23,7 +23,6 @@ package org.apache.inlong.manager.common.consts;
public class SinkType extends StreamType {
public static final String HIVE = "HIVE";
- public static final String ICEBERG = "ICEBERG";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String HBASE = "HBASE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index 77ba1e3ab5..afbd57bc50 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -28,4 +28,6 @@ public class StreamType {
public static final String SQLSERVER = "SQLSERVER";
public static final String ORACLE = "ORACLE";
public static final String PULSAR = "PULSAR";
+ public static final String ICEBERG = "ICEBERG";
+
}
diff --git a/inlong-manager/manager-dao/pom.xml
b/inlong-manager/manager-dao/pom.xml
index 13d850b8e5..29e1fb4a24 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,6 +65,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun</groupId>
+ <artifactId>jconsole</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
index 189890e733..07da9d3b5a 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.IcebergProvider;
import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
import org.apache.inlong.manager.pojo.sort.node.provider.MongoDBProvider;
import org.apache.inlong.manager.pojo.sort.node.provider.MySQLBinlogProvider;
@@ -56,6 +57,7 @@ public class ExtractNodeProviderFactory {
EXTRACT_NODE_PROVIDER_LIST.add(new SQLServerProvider());
EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
EXTRACT_NODE_PROVIDER_LIST.add(new MySQLBinlogProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new IcebergProvider());
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 1df0c0f567..416a409ac5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -17,15 +17,19 @@
package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.consts.StreamType;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
@@ -35,11 +39,32 @@ import java.util.Map;
/**
* The Provider for creating Iceberg load nodes.
*/
-public class IcebergProvider implements LoadNodeProvider {
+public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider {
@Override
public Boolean accept(String sinkType) {
- return SinkType.ICEBERG.equals(sinkType);
+ return StreamType.ICEBERG.equals(sinkType);
+ }
+
+ @Override
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+ IcebergSource icebergSource = (IcebergSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos =
parseStreamFieldInfos(icebergSource.getFieldList(),
icebergSource.getSourceName());
+ Map<String, String> properties =
parseProperties(icebergSource.getProperties());
+
+ return new IcebergExtractNode(icebergSource.getSourceName(),
+ icebergSource.getSourceName(),
+ fieldInfos,
+ null,
+ icebergSource.getUri(),
+ icebergSource.getWarehouse(),
+ icebergSource.getDatabase(),
+ icebergSource.getTableName(),
+ CatalogType.HIVE,
+ "HIVE",
+ icebergSource.getPrimaryKey(),
+ null,
+ properties);
}
@Override
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..1e18ea5020
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * The Iceberg source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Iceberg source info")
+@JsonTypeDefine(value = SourceType.ICEBERG)
+public class IcebergSource extends StreamSource {
+
+ @ApiModelProperty("Iceberg data uri")
+ private String uri;
+
+ @ApiModelProperty("Iceberg data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Table name")
+ private String tableName;
+
+ @ApiModelProperty("PrimaryKey")
+ private String primaryKey;
+
+ public IcebergSource() {
+ this.setSourceType(SourceType.ICEBERG);
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, IcebergSourceRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
new file mode 100644
index 0000000000..a8e4d6b5fe
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Iceberg source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IcebergSourceDTO {
+
+ @ApiModelProperty("Iceberg data uri")
+ private String uri;
+
+ @ApiModelProperty("Iceberg data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Table name")
+ private String tableName;
+
+ @ApiModelProperty("PrimaryKey")
+ private String primaryKey;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static IcebergSourceDTO getFromRequest(IcebergSourceRequest
request, String extParams) {
+ IcebergSourceDTO dto = StringUtils.isNotBlank(extParams)
+ ? IcebergSourceDTO.getFromJson(extParams)
+ : new IcebergSourceDTO();
+ return CommonBeanUtils.copyProperties(request, dto, true);
+ }
+
+ /**
+ * Get the dto instance from the JSON string
+ */
+ public static IcebergSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, IcebergSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("parse extParams of IcebergSource failure:
%s", e.getMessage()));
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
new file mode 100644
index 0000000000..ae62a6bb4c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Request info of the Hudi source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Iceberg source")
+@JsonTypeDefine(value = SourceType.ICEBERG)
+public class IcebergSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("Iceberg data uri")
+ private String uri;
+
+ @ApiModelProperty("Iceberg data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Table name")
+ private String tableName;
+
+ @ApiModelProperty("PrimaryKey")
+ private String primaryKey;
+
+ public IcebergSourceRequest() {
+ this.setSourceType(SourceType.ICEBERG);
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
new file mode 100644
index 0000000000..6e75373363
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceDTO;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Iceberg stream source operator
+ */
+@Service
+public class IcebergSourceOperator extends AbstractSourceOperator {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.ICEBERG.equals(sourceType);
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.ICEBERG;
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
+ IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ IcebergSourceDTO dto =
IcebergSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams());
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ String.format("serialize extParams of Kafka SourceDTO
failure: %s", e.getMessage()));
+ }
+ }
+
+ @Override
+ public StreamSource getFromEntity(StreamSourceEntity entity) {
+ IcebergSource source = new IcebergSource();
+ if (entity == null) {
+ return source;
+ }
+
+ IcebergSourceDTO dto =
IcebergSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, source, true);
+ CommonBeanUtils.copyProperties(dto, source, true);
+
+ List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+ source.setFieldList(sourceFields);
+ return source;
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 543f8cf3be..09e67d573d 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -65,7 +65,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = RedisExtractNode.class, name =
"redisExtract"),
@JsonSubTypes.Type(value = DorisExtractNode.class, name =
"dorisExtract"),
@JsonSubTypes.Type(value = HudiExtractNode.class, name =
"hudiExtract"),
- @JsonSubTypes.Type(value = IcebergExtracNode.class, name =
"icebergExtract"),
+ @JsonSubTypes.Type(value = IcebergExtractNode.class, name =
"icebergExtract"),
})
@Data
@NoArgsConstructor
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index f755f439d7..b926efd0dc 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -79,7 +79,7 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = RedisExtractNode.class, name =
"redisExtract"),
@JsonSubTypes.Type(value = DorisExtractNode.class, name =
"dorisExtract"),
@JsonSubTypes.Type(value = HudiExtractNode.class, name =
"hudiExtract"),
- @JsonSubTypes.Type(value = IcebergExtracNode.class, name =
"icebergExtract"),
+ @JsonSubTypes.Type(value = IcebergExtractNode.class, name =
"icebergExtract"),
@JsonSubTypes.Type(value = TransformNode.class, name =
"baseTransform"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
similarity index 97%
rename from
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
rename to
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index b876d96aff..14fec78da8 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -42,7 +42,7 @@ import java.util.Map;
@JsonTypeName("icebergExtract")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
-public class IcebergExtracNode extends ExtractNode implements Serializable {
+public class IcebergExtractNode extends ExtractNode implements Serializable {
@JsonProperty("tableName")
@Nonnull
@@ -72,7 +72,7 @@ public class IcebergExtracNode extends ExtractNode implements
Serializable {
@Nullable
private Long startSnapShotId;
- public IcebergExtracNode(
+ public IcebergExtractNode(
@Nonnull @JsonProperty("id") String id,
@Nonnull @JsonProperty("name") String name,
@Nonnull @JsonProperty("fields") List<FieldInfo> fields,
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
index 5ec917debe..d9c7c26fc0 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
@@ -26,7 +26,7 @@ import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
@@ -83,9 +83,9 @@ public class Iceberg2StarRocksSqlParserTest extends
AbstractTestBase {
.collect(Collectors.toList());
}
- private IcebergExtracNode buildIcebergExtracNode(String id) {
+ private IcebergExtractNode buildIcebergExtracNode(String id) {
- return new IcebergExtracNode(id, "iceberg-source", fields(), null, uri,
+ return new IcebergExtractNode(id, "iceberg-source", fields(), null,
uri,
warehouse, icDatabase, icTable,
IcebergConstant.CatalogType.HIVE, catalogName,
null, null, null);