This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b136bc71fe [INLONG-8815][Manager] Supports configuring iceberg 
streamSources (#8816)
b136bc71fe is described below

commit b136bc71fefd505118743424824cb17cf50990d4
Author: fuweng11 <[email protected]>
AuthorDate: Mon Sep 11 16:08:00 2023 +0800

    [INLONG-8815][Manager] Supports configuring iceberg streamSources (#8816)
---
 inlong-audit/audit-store/pom.xml                   |  8 ++
 .../inlong/manager/common/consts/SinkType.java     |  1 -
 .../inlong/manager/common/consts/StreamType.java   |  2 +
 inlong-manager/manager-dao/pom.xml                 |  8 ++
 .../pojo/sort/node/ExtractNodeProviderFactory.java |  2 +
 .../pojo/sort/node/provider/IcebergProvider.java   | 31 +++++++-
 .../manager/pojo/source/iceberg/IcebergSource.java | 70 ++++++++++++++++++
 .../pojo/source/iceberg/IcebergSourceDTO.java      | 80 ++++++++++++++++++++
 .../pojo/source/iceberg/IcebergSourceRequest.java  | 59 +++++++++++++++
 .../source/iceberg/IcebergSourceOperator.java      | 86 ++++++++++++++++++++++
 .../inlong/sort/protocol/node/ExtractNode.java     |  4 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |  4 +-
 ...bergExtracNode.java => IcebergExtractNode.java} |  4 +-
 .../parser/Iceberg2StarRocksSqlParserTest.java     |  6 +-
 14 files changed, 352 insertions(+), 13 deletions(-)

diff --git a/inlong-audit/audit-store/pom.xml b/inlong-audit/audit-store/pom.xml
index 1ec2947d2d..45bbb66a97 100644
--- a/inlong-audit/audit-store/pom.xml
+++ b/inlong-audit/audit-store/pom.xml
@@ -115,6 +115,14 @@
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-autoconfigure</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>jconsole</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index be60980f4c..f11e9daf6d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -23,7 +23,6 @@ package org.apache.inlong.manager.common.consts;
 public class SinkType extends StreamType {
 
     public static final String HIVE = "HIVE";
-    public static final String ICEBERG = "ICEBERG";
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String HBASE = "HBASE";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index 77ba1e3ab5..afbd57bc50 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -28,4 +28,6 @@ public class StreamType {
     public static final String SQLSERVER = "SQLSERVER";
     public static final String ORACLE = "ORACLE";
     public static final String PULSAR = "PULSAR";
+    public static final String ICEBERG = "ICEBERG";
+
 }
diff --git a/inlong-manager/manager-dao/pom.xml 
b/inlong-manager/manager-dao/pom.xml
index 13d850b8e5..29e1fb4a24 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -65,6 +65,14 @@
                     <groupId>org.springframework.boot</groupId>
                     <artifactId>spring-boot-autoconfigure</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun</groupId>
+                    <artifactId>jconsole</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
index 189890e733..07da9d3b5a 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.IcebergProvider;
 import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
 import org.apache.inlong.manager.pojo.sort.node.provider.MongoDBProvider;
 import org.apache.inlong.manager.pojo.sort.node.provider.MySQLBinlogProvider;
@@ -56,6 +57,7 @@ public class ExtractNodeProviderFactory {
         EXTRACT_NODE_PROVIDER_LIST.add(new SQLServerProvider());
         EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
         EXTRACT_NODE_PROVIDER_LIST.add(new MySQLBinlogProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new IcebergProvider());
 
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
index 1df0c0f567..416a409ac5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -17,15 +17,19 @@
 
 package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.consts.StreamType;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
@@ -35,11 +39,32 @@ import java.util.Map;
 /**
  * The Provider for creating Iceberg load nodes.
  */
-public class IcebergProvider implements LoadNodeProvider {
+public class IcebergProvider implements ExtractNodeProvider, LoadNodeProvider {
 
     @Override
     public Boolean accept(String sinkType) {
-        return SinkType.ICEBERG.equals(sinkType);
+        return StreamType.ICEBERG.equals(sinkType);
+    }
+
+    @Override
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+        IcebergSource icebergSource = (IcebergSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = 
parseStreamFieldInfos(icebergSource.getFieldList(), 
icebergSource.getSourceName());
+        Map<String, String> properties = 
parseProperties(icebergSource.getProperties());
+
+        return new IcebergExtractNode(icebergSource.getSourceName(),
+                icebergSource.getSourceName(),
+                fieldInfos,
+                null,
+                icebergSource.getUri(),
+                icebergSource.getWarehouse(),
+                icebergSource.getDatabase(),
+                icebergSource.getTableName(),
+                CatalogType.HIVE,
+                "HIVE",
+                icebergSource.getPrimaryKey(),
+                null,
+                properties);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..1e18ea5020
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSource.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * The Iceberg source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Iceberg source info")
+@JsonTypeDefine(value = SourceType.ICEBERG)
+public class IcebergSource extends StreamSource {
+
+    @ApiModelProperty("Iceberg data uri")
+    private String uri;
+
+    @ApiModelProperty("Iceberg data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Database name")
+    private String database;
+
+    @ApiModelProperty("Table name")
+    private String tableName;
+
+    @ApiModelProperty("PrimaryKey")
+    private String primaryKey;
+
+    public IcebergSource() {
+        this.setSourceType(SourceType.ICEBERG);
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, IcebergSourceRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
new file mode 100644
index 0000000000..a8e4d6b5fe
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceDTO.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Iceberg source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class IcebergSourceDTO {
+
+    @ApiModelProperty("Iceberg data uri")
+    private String uri;
+
+    @ApiModelProperty("Iceberg data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Database name")
+    private String database;
+
+    @ApiModelProperty("Table name")
+    private String tableName;
+
+    @ApiModelProperty("PrimaryKey")
+    private String primaryKey;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static IcebergSourceDTO getFromRequest(IcebergSourceRequest 
request, String extParams) {
+        IcebergSourceDTO dto = StringUtils.isNotBlank(extParams)
+                ? IcebergSourceDTO.getFromJson(extParams)
+                : new IcebergSourceDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string
+     */
+    public static IcebergSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, IcebergSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("parse extParams of IcebergSource failure: 
%s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
new file mode 100644
index 0000000000..ae62a6bb4c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/iceberg/IcebergSourceRequest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Request info of the Hudi source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Iceberg source")
+@JsonTypeDefine(value = SourceType.ICEBERG)
+public class IcebergSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("Iceberg data uri")
+    private String uri;
+
+    @ApiModelProperty("Iceberg data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Database name")
+    private String database;
+
+    @ApiModelProperty("Table name")
+    private String tableName;
+
+    @ApiModelProperty("PrimaryKey")
+    private String primaryKey;
+
+    public IcebergSourceRequest() {
+        this.setSourceType(SourceType.ICEBERG);
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
new file mode 100644
index 0000000000..6e75373363
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/iceberg/IcebergSourceOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.iceberg;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceDTO;
+import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Iceberg stream source operator
+ */
+@Service
+public class IcebergSourceOperator extends AbstractSourceOperator {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.ICEBERG.equals(sourceType);
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.ICEBERG;
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
+        IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            IcebergSourceDTO dto = 
IcebergSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of Kafka SourceDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public StreamSource getFromEntity(StreamSourceEntity entity) {
+        IcebergSource source = new IcebergSource();
+        if (entity == null) {
+            return source;
+        }
+
+        IcebergSourceDTO dto = 
IcebergSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, source, true);
+        CommonBeanUtils.copyProperties(dto, source, true);
+
+        List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+        source.setFieldList(sourceFields);
+        return source;
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 543f8cf3be..09e67d573d 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -65,7 +65,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = RedisExtractNode.class, name = 
"redisExtract"),
         @JsonSubTypes.Type(value = DorisExtractNode.class, name = 
"dorisExtract"),
         @JsonSubTypes.Type(value = HudiExtractNode.class, name = 
"hudiExtract"),
-        @JsonSubTypes.Type(value = IcebergExtracNode.class, name = 
"icebergExtract"),
+        @JsonSubTypes.Type(value = IcebergExtractNode.class, name = 
"icebergExtract"),
 })
 @Data
 @NoArgsConstructor
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index f755f439d7..b926efd0dc 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.DorisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.FileSystemExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
@@ -79,7 +79,7 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = RedisExtractNode.class, name = 
"redisExtract"),
         @JsonSubTypes.Type(value = DorisExtractNode.class, name = 
"dorisExtract"),
         @JsonSubTypes.Type(value = HudiExtractNode.class, name = 
"hudiExtract"),
-        @JsonSubTypes.Type(value = IcebergExtracNode.class, name = 
"icebergExtract"),
+        @JsonSubTypes.Type(value = IcebergExtractNode.class, name = 
"icebergExtract"),
         @JsonSubTypes.Type(value = TransformNode.class, name = 
"baseTransform"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
similarity index 97%
rename from 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
rename to 
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
index b876d96aff..14fec78da8 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtracNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/IcebergExtractNode.java
@@ -42,7 +42,7 @@ import java.util.Map;
 @JsonTypeName("icebergExtract")
 @JsonInclude(JsonInclude.Include.NON_NULL)
 @Data
-public class IcebergExtracNode extends ExtractNode implements Serializable {
+public class IcebergExtractNode extends ExtractNode implements Serializable {
 
     @JsonProperty("tableName")
     @Nonnull
@@ -72,7 +72,7 @@ public class IcebergExtracNode extends ExtractNode implements 
Serializable {
     @Nullable
     private Long startSnapShotId;
 
-    public IcebergExtracNode(
+    public IcebergExtractNode(
             @Nonnull @JsonProperty("id") String id,
             @Nonnull @JsonProperty("name") String name,
             @Nonnull @JsonProperty("fields") List<FieldInfo> fields,
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
index 5ec917debe..d9c7c26fc0 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/Iceberg2StarRocksSqlParserTest.java
@@ -26,7 +26,7 @@ import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.node.extract.IcebergExtracNode;
+import org.apache.inlong.sort.protocol.node.extract.IcebergExtractNode;
 import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
@@ -83,9 +83,9 @@ public class Iceberg2StarRocksSqlParserTest extends 
AbstractTestBase {
                 .collect(Collectors.toList());
     }
 
-    private IcebergExtracNode buildIcebergExtracNode(String id) {
+    private IcebergExtractNode buildIcebergExtracNode(String id) {
 
-        return new IcebergExtracNode(id, "iceberg-source", fields(), null, uri,
+        return new IcebergExtractNode(id, "iceberg-source", fields(), null, 
uri,
                 warehouse, icDatabase, icTable, 
IcebergConstant.CatalogType.HIVE, catalogName,
                 null, null, null);
 

Reply via email to