This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dc12a237da [INLONG-8838][Sort] IcebergSource support metadata (#8895)
dc12a237da is described below

commit dc12a237da7ef791b4e76d3247014b8f4c5d9235
Author: vernedeng <[email protected]>
AuthorDate: Mon Sep 18 11:59:16 2023 +0800

    [INLONG-8838][Sort] IcebergSource support metadata (#8895)
---
 inlong-manager/manager-service/pom.xml             |  58 +--
 .../resource/sink/hudi/HudiCatalogClient.java      |  53 +-
 .../manager/service/sort/SortServiceImplTest.java  |  11 +-
 .../org/apache/inlong/sort/base/Constants.java     |   2 +
 .../inlong/sort/base/metric/SourceMetricData.java  |   6 +
 .../sort/base/util/CalculateObjectSizeUtils.java   |   7 +
 .../sort-connectors/iceberg/pom.xml                |  34 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java     |   3 +-
 .../sort/iceberg/IcebergReadableMetadata.java      |  65 +++
 .../inlong/sort/iceberg/source/IcebergSource.java  | 565 +++++++++++++++++++++
 .../sort/iceberg/source/IcebergTableSource.java    | 270 ++++++++++
 .../iceberg/source/reader/ArrayBatchRecords.java   | 179 +++++++
 .../reader/ArrayPoolDataIteratorBatcher.java       | 138 +++++
 .../iceberg/source/reader/IcebergSourceReader.java |  79 +++
 .../source/reader/IcebergSourceRecordEmitter.java} |  38 +-
 .../source/reader/IcebergSourceSplitReader.java    | 143 ++++++
 .../reader/InlongIcebergSourceReaderMetrics.java   |  53 ++
 .../source/reader/MetaDataReaderFunction.java      |  74 +++
 .../sort/iceberg/source/reader/RecordFactory.java  |  36 ++
 .../source/reader/RowDataReaderFunction.java       |  88 ++++
 .../source/reader/RowDataRecordFactory.java        |  74 +++
 .../source/utils/RecyclableJoinedRowData.java      | 251 +++++++++
 .../iceberg/source/utils/RowDataCloneUtil.java     |  71 +++
 .../sort-flink-dependencies/pom.xml                |   5 +
 licenses/inlong-sort-connectors/LICENSE            |  10 +
 pom.xml                                            |   1 +
 26 files changed, 2220 insertions(+), 94 deletions(-)

diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index fe32b68874..d6f1aff1d4 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -252,63 +252,9 @@
         </dependency>
         <dependency>
             <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.calcite</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.calcite.avatica</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>apache-curator</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-yarn-registry</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-llap-tez</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-vector-code-gen</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.ivy</groupId>
-                    <artifactId>ivy</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.codehaus.groovy</groupId>
-                    <artifactId>groovy-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.antlr</groupId>
-                    <artifactId>ST4</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>stax</groupId>
-                    <artifactId>stax-api</artifactId>
-                </exclusion>
-            </exclusions>
+            <artifactId>hive-standalone-metastore</artifactId>
+            <version>${hive3x.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 972fe52a9b..defa95f8c7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -23,21 +23,29 @@ import 
org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -81,7 +89,7 @@ public class HudiCatalogClient implements AutoCloseable {
     public void open() {
         if (this.client == null) {
             try {
-                this.client = Hive.get(hiveConf).getMSC();
+                this.client = new HiveMetaStoreClient(hiveConf);
             } catch (Exception e) {
                 throw new RuntimeException("Failed to create hive metastore 
client", e);
             }
@@ -188,7 +196,7 @@ public class HudiCatalogClient implements AutoCloseable {
             HudiTableInfo tableInfo,
             boolean useRealTimeInputFormat)
             throws TException, IOException {
-        Table hiveTable = 
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName);
+        Table hiveTable = this.getEmptyTable(dbName, tableName);
         
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
         hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
@@ -250,4 +258,43 @@ public class HudiCatalogClient implements AutoCloseable {
         }
     }
 
+    public Table getEmptyTable(String databaseName, String tableName) {
+        StorageDescriptor sd = new StorageDescriptor();
+        {
+            sd.setSerdeInfo(new SerDeInfo());
+            sd.setNumBuckets(-1);
+            sd.setBucketCols(new ArrayList<String>());
+            sd.setCols(new ArrayList<FieldSchema>());
+            sd.setParameters(new HashMap<String, String>());
+            sd.setSortCols(new ArrayList<Order>());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            // We have to use MetadataTypedColumnsetSerDe because 
LazySimpleSerDe does
+            // not support a table with no columns.
+            
sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+            
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+            sd.setInputFormat(SequenceFileInputFormat.class.getName());
+            SkewedInfo skewInfo = new SkewedInfo();
+            skewInfo.setSkewedColNames(new ArrayList<String>());
+            skewInfo.setSkewedColValues(new ArrayList<List<String>>());
+            skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, 
String>());
+            sd.setSkewedInfo(skewInfo);
+        }
+
+        org.apache.hadoop.hive.metastore.api.Table t = new 
org.apache.hadoop.hive.metastore.api.Table();
+        {
+            t.setSd(sd);
+            t.setPartitionKeys(new ArrayList<FieldSchema>());
+            t.setParameters(new HashMap<String, String>());
+            t.setTableType(TableType.MANAGED_TABLE.toString());
+            t.setDbName(databaseName);
+            t.setTableName(tableName);
+            // set create time
+            t.setCreateTime((int) (System.currentTimeMillis() / 1000));
+        }
+        // Explictly set the bucketing version
+        t.getParameters().put(hive_metastoreConstants.TABLE_BUCKETING_VERSION,
+                "2");
+        return t;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 993ffd5734..5032057db0 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -49,7 +49,6 @@ import org.apache.inlong.manager.service.node.DataNodeService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 
-import org.json.JSONObject;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -140,9 +139,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Order(2)
     @Transactional
     public void testSourceCorrectParamsOfNoTagSortCluster() {
-        SortSourceConfigResponse response = 
sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
-        JSONObject jo = new JSONObject(response);
-        System.out.println(jo);
+        SortSourceConfigResponse response = 
sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");;
         Assertions.assertEquals(0, response.getCode());
         Assertions.assertNotNull(response.getData());
         Assertions.assertNotNull(response.getMd5());
@@ -193,8 +190,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Transactional
     public void testClusterCorrectParams() {
         SortClusterResponse response = 
sortService.getClusterConfig(TEST_CLUSTER_1, "");
-        JSONObject jo = new JSONObject(response);
-        System.out.println(jo);
         Assertions.assertEquals(0, response.getCode());
         Assertions.assertNotNull(response.getData());
         Assertions.assertNotNull(response.getMd5());
@@ -232,8 +227,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Transactional
     public void testSourceCorrectParamsOfTaggedSortCluster() {
         SortSourceConfigResponse response = 
sortService.getSourceConfig(TEST_CLUSTER_2, TEST_TASK_2, "");
-        JSONObject jo = new JSONObject(response);
-        System.out.println(jo);
         Assertions.assertEquals(0, response.getCode());
         Assertions.assertNotNull(response.getMd5());
         Assertions.assertNotNull(response.getMsg());
@@ -248,8 +241,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
         Assertions.assertEquals(1, zone.getTopics().size());
 
         response = sortService.getSourceConfig(TEST_CLUSTER_3, TEST_TASK_3, 
"");
-        jo = new JSONObject(response);
-        System.out.println(jo);
         Assertions.assertEquals(0, response.getCode());
         Assertions.assertNotNull(response.getMd5());
         Assertions.assertNotNull(response.getMsg());
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c0fbed00c7..72ce9d89b8 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -167,6 +167,8 @@ public final class Constants {
 
     public static final String META_INCREMENTAL = "incremental_inlong";
 
+    public static final String META_INLONG_DATA_TIME = "inlong_data_time";
+
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index ec64f53b39..c77f688045 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -38,6 +38,7 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize;
 import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
 
 /**
@@ -242,6 +243,11 @@ public class SourceMetricData implements MetricData {
         outputMetrics(1, getDataSize(data));
     }
 
+    public void outputMetricsWithEstimate(Object[] records) {
+        long size = getDataArraySize(records);
+        outputMetrics(records.length, size);
+    }
+
     public void outputMetricsWithEstimate(Object data, long fetchDelay, long 
emitDelay) {
         outputMetrics(1, getDataSize(data));
         this.fetchDelay = fetchDelay;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
index 0826eb2a9e..748522693b 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.base.util;
 import org.apache.flink.table.data.binary.BinaryRowData;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 
 /**
  * calculate tool for object
@@ -44,4 +45,10 @@ public class CalculateObjectSizeUtils {
         return size;
     }
 
+    public static long getDataArraySize(Object[] objects) {
+        return Arrays.stream(objects)
+                .mapToLong(CalculateObjectSizeUtils::getDataSize)
+                .sum();
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
index c481dd8de6..81168093ba 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -39,6 +39,18 @@
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-connector-base</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libfb303</artifactId>
+            <version>${libfb303.version}</version>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
@@ -62,6 +74,7 @@
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
             <version>${hive3x.version}</version>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -79,6 +92,17 @@
             <artifactId>iceberg-hive-metastore</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version.v1.15}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -87,7 +111,6 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <version>${plugin.shade.version}</version>
-
                 <executions>
                     <execution>
                         <id>shade-flink</id>
@@ -108,7 +131,8 @@
                                     <include>org.apache.hive:*</include>
                                     <!--  Include fixed version 18.0-13.0 of 
flink shaded guava  -->
                                     
<include>org.apache.flink:flink-shaded-guava</include>
-                                    <include>com.google.protobuf:*</include>
+                                    
<include>org.apache.flink:flink-connector-files</include>
+                                    
<include>org.apache.flink:flink-connector-base</include>
                                     <include>org.apache.thrift:*</include>
                                     <include>com.facebook.*:*</include>
                                 </includes>
@@ -123,6 +147,12 @@
                                         
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
                                     </includes>
                                 </filter>
+                                <filter>
+                                    
<artifact>org.apache.hive:hive-exec</artifact>
+                                    <excludes>
+                                        
<exclude>com/google/protobuf/**</exclude>
+                                    </excludes>
+                                </filter>
                             </filters>
                             <relocations>
                                 <relocation>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 1edf546c23..ef50b8d3a8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.iceberg;
 
+import org.apache.inlong.sort.iceberg.source.IcebergTableSource;
+
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -38,7 +40,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.flink.IcebergTableSink;
 import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.source.IcebergTableSource;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
new file mode 100644
index 0000000000..e003555a45
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.inlong.sort.base.Constants;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+
+/**
+ * IcebergReadableMetadata
+ */
+public enum IcebergReadableMetadata {
+
+    INLONG_DATA_TIME(
+            Constants.META_INLONG_DATA_TIME,
+            DataTypes.BIGINT().notNull(),
+            r -> System.currentTimeMillis());
+
+    private final String key;
+    private final DataType dataType;
+    private final MetadataConverter converter;
+
+    IcebergReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+        this.key = key;
+        this.dataType = dataType;
+        this.converter = converter;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public DataType getDataType() {
+        return dataType;
+    }
+
+    public MetadataConverter getConverter() {
+        return converter;
+    }
+
+    public interface MetadataConverter extends Serializable {
+
+        Object read(RowData rowData);
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
new file mode 100644
index 0000000000..47f7d04b44
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+import org.apache.inlong.sort.iceberg.source.reader.IcebergSourceReader;
+import 
org.apache.inlong.sort.iceberg.source.reader.InlongIcebergSourceReaderMetrics;
+import org.apache.inlong.sort.iceberg.source.reader.MetaDataReaderFunction;
+import org.apache.inlong.sort.iceberg.source.reader.RowDataReaderFunction;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.BaseMetadataTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit, 
IcebergEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSource.class);
+
+    private final TableLoader tableLoader;
+    private final ScanContext scanContext;
+    private final ReaderFunction<T> readerFunction;
+    private final SplitAssignerFactory assignerFactory;
+
+    private final MetricOption metricOption;
+
+    // Can't use SerializableTable as enumerator needs a regular table
+    // that can discover table changes
+    private transient Table table;
+
+    IcebergSource(
+            TableLoader tableLoader,
+            ScanContext scanContext,
+            ReaderFunction<T> readerFunction,
+            SplitAssignerFactory assignerFactory,
+            Table table,
+            MetricOption metricOption) {
+        this.tableLoader = tableLoader;
+        this.scanContext = scanContext;
+        this.readerFunction = readerFunction;
+        this.assignerFactory = assignerFactory;
+        this.table = table;
+        this.metricOption = metricOption;
+    }
+
+    String name() {
+        return "IcebergSource-" + lazyTable().name();
+    }
+
+    private String planningThreadName() {
+        // Ideally, operatorId should be used as the threadPoolName as Flink 
guarantees its uniqueness
+        // within a job. SplitEnumeratorContext doesn't expose the 
OperatorCoordinator.Context, which
+        // would contain the OperatorID. Need to discuss with Flink community 
whether it is ok to expose
+        // a public API like the protected method "OperatorCoordinator.Context 
getCoordinatorContext()"
+        // from SourceCoordinatorContext implementation. For now, <table 
name>-<random UUID> is used as
+        // the unique thread pool name.
+        return lazyTable().name() + "-" + UUID.randomUUID();
+    }
+
+    private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+        ExecutorService workerPool =
+                ThreadPools.newWorkerPool(threadName, 
scanContext.planParallelism());
+        try {
+            List<IcebergSourceSplit> splits =
+                    FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), 
scanContext, workerPool);
+            LOG.info(
+                    "Discovered {} splits from table {} during job 
initialization",
+                    splits.size(),
+                    lazyTable().name());
+            return splits;
+        } finally {
+            workerPool.shutdown();
+        }
+    }
+
+    private Table lazyTable() {
+        if (table == null) {
+            tableLoader.open();
+            try (TableLoader loader = tableLoader) {
+                this.table = loader.loadTable();
+            } catch (IOException e) {
+                throw new UncheckedIOException("Failed to close table loader", 
e);
+            }
+        }
+
+        return table;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, IcebergSourceSplit> 
createReader(SourceReaderContext readerContext) {
+        InlongIcebergSourceReaderMetrics metrics =
+                new 
InlongIcebergSourceReaderMetrics(readerContext.metricGroup(), 
lazyTable().name());
+        metrics.registerMetrics(metricOption);
+        return new IcebergSourceReader<>(metrics, readerFunction, 
readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+        return createEnumerator(enumContext, null);
+    }
+
+    @Override
+    public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<IcebergSourceSplit> enumContext, 
IcebergEnumeratorState enumState) {
+        return createEnumerator(enumContext, enumState);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+        return IcebergSourceSplitSerializer.INSTANCE;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<IcebergEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+        return IcebergEnumeratorStateSerializer.INSTANCE;
+    }
+
+    private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+            @Nullable IcebergEnumeratorState enumState) {
+        SplitAssigner assigner;
+        if (enumState == null) {
+            assigner = assignerFactory.createAssigner();
+        } else {
+            LOG.info(
+                    "Iceberg source restored {} splits from state for table 
{}",
+                    enumState.pendingSplits().size(),
+                    lazyTable().name());
+            assigner = 
assignerFactory.createAssigner(enumState.pendingSplits());
+        }
+
+        if (scanContext.isStreaming()) {
+            ContinuousSplitPlanner splitPlanner =
+                    new ContinuousSplitPlannerImpl(tableLoader.clone(), 
scanContext, planningThreadName());
+            return new ContinuousIcebergEnumerator(
+                    enumContext, assigner, scanContext, splitPlanner, 
enumState);
+        } else {
+            List<IcebergSourceSplit> splits = 
planSplitsForBatch(planningThreadName());
+            assigner.onDiscoveredSplits(splits);
+            return new StaticIcebergEnumerator(enumContext, assigner);
+        }
+    }
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    public static Builder<RowData> forRowData() {
+        return new Builder<>();
+    }
+
+    public static class Builder<T> {
+
+        private TableLoader tableLoader;
+        private Table table;
+        private SplitAssignerFactory splitAssignerFactory;
+        private ReaderFunction<T> readerFunction;
+        private ReadableConfig flinkConfig = new Configuration();
+        private final ScanContext.Builder contextBuilder = 
ScanContext.builder();
+        private TableSchema projectedFlinkSchema;
+        private Boolean exposeLocality;
+        private MetadataConverter[] metadataConverters;
+        private MetricOption metricOption;
+        private String inlongAuditAddress;
+        private String inlongAuditKeys;
+        private String inlongMetrics;
+
+        private final Map<String, String> readOptions = Maps.newHashMap();
+
+        Builder() {
+        }
+
+        public Builder<T> metricOption(MetricOption metricOption) {
+            this.metricOption = metricOption;
+            return this;
+        }
+
+        public Builder<T> inlongAuditAddress(String inlongAuditAddress) {
+            this.inlongAuditAddress = inlongAuditAddress;
+            return this;
+        }
+
+        public Builder<T> inlongAuditKeys(String inlongAuditKeys) {
+            this.inlongAuditKeys = inlongAuditKeys;
+            return this;
+        }
+
+        public Builder<T> inlongMetrics(String inlongMetrics) {
+            this.inlongMetrics = inlongMetrics;
+            return this;
+        }
+
+        public Builder<T> metadataConverters(MetadataConverter[] 
metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder<T> tableLoader(TableLoader loader) {
+            this.tableLoader = loader;
+            return this;
+        }
+
+        public Builder<T> table(Table newTable) {
+            this.table = newTable;
+            return this;
+        }
+
+        public Builder<T> assignerFactory(SplitAssignerFactory 
assignerFactory) {
+            this.splitAssignerFactory = assignerFactory;
+            return this;
+        }
+
+        public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+            this.readerFunction = newReaderFunction;
+            return this;
+        }
+
+        public Builder<T> flinkConfig(ReadableConfig config) {
+            this.flinkConfig = config;
+            return this;
+        }
+
+        public Builder<T> caseSensitive(boolean newCaseSensitive) {
+            readOptions.put(FlinkReadOptions.CASE_SENSITIVE, 
Boolean.toString(newCaseSensitive));
+            return this;
+        }
+
+        public Builder<T> useSnapshotId(Long newSnapshotId) {
+            if (newSnapshotId != null) {
+                readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(), 
Long.toString(newSnapshotId));
+            }
+            return this;
+        }
+
+        public Builder<T> streamingStartingStrategy(StreamingStartingStrategy 
newStartingStrategy) {
+            readOptions.put(FlinkReadOptions.STARTING_STRATEGY, 
newStartingStrategy.name());
+            return this;
+        }
+
+        public Builder<T> startSnapshotTimestamp(Long 
newStartSnapshotTimestamp) {
+            if (newStartSnapshotTimestamp != null) {
+                readOptions.put(
+                        FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(),
+                        Long.toString(newStartSnapshotTimestamp));
+            }
+            return this;
+        }
+
+        public Builder<T> startSnapshotId(Long newStartSnapshotId) {
+            if (newStartSnapshotId != null) {
+                readOptions.put(
+                        FlinkReadOptions.START_SNAPSHOT_ID.key(), 
Long.toString(newStartSnapshotId));
+            }
+            return this;
+        }
+
+        public Builder<T> tag(String tag) {
+            readOptions.put(FlinkReadOptions.TAG.key(), tag);
+            return this;
+        }
+
+        public Builder<T> branch(String branch) {
+            readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+            return this;
+        }
+
+        public Builder<T> startTag(String startTag) {
+            readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+            return this;
+        }
+
+        public Builder<T> endTag(String endTag) {
+            readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+            return this;
+        }
+
+        public Builder<T> endSnapshotId(Long newEndSnapshotId) {
+            if (newEndSnapshotId != null) {
+                readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(), 
Long.toString(newEndSnapshotId));
+            }
+            return this;
+        }
+
+        public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
+            if (newAsOfTimestamp != null) {
+                readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(), 
Long.toString(newAsOfTimestamp));
+            }
+            return this;
+        }
+
+        public Builder<T> splitSize(Long newSplitSize) {
+            if (newSplitSize != null) {
+                readOptions.put(FlinkReadOptions.SPLIT_SIZE, 
Long.toString(newSplitSize));
+            }
+            return this;
+        }
+
+        public Builder<T> splitLookback(Integer newSplitLookback) {
+            if (newSplitLookback != null) {
+                readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK, 
Integer.toString(newSplitLookback));
+            }
+            return this;
+        }
+
+        public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
+            if (newSplitOpenFileCost != null) {
+                readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST, 
Long.toString(newSplitOpenFileCost));
+            }
+
+            return this;
+        }
+
+        public Builder<T> streaming(boolean streaming) {
+            readOptions.put(FlinkReadOptions.STREAMING, 
Boolean.toString(streaming));
+            return this;
+        }
+
+        public Builder<T> monitorInterval(Duration newMonitorInterval) {
+            if (newMonitorInterval != null) {
+                readOptions.put(FlinkReadOptions.MONITOR_INTERVAL, 
newMonitorInterval.toNanos() + " ns");
+            }
+            return this;
+        }
+
+        public Builder<T> nameMapping(String newNameMapping) {
+            readOptions.put(TableProperties.DEFAULT_NAME_MAPPING, 
newNameMapping);
+            return this;
+        }
+
+        public Builder<T> project(Schema newProjectedSchema) {
+            this.contextBuilder.project(newProjectedSchema);
+            return this;
+        }
+
+        public Builder<T> project(TableSchema newProjectedFlinkSchema) {
+            this.projectedFlinkSchema = newProjectedFlinkSchema;
+            return this;
+        }
+
+        public Builder<T> filters(List<Expression> newFilters) {
+            this.contextBuilder.filters(newFilters);
+            return this;
+        }
+
+        public Builder<T> limit(Long newLimit) {
+            if (newLimit != null) {
+                readOptions.put(FlinkReadOptions.LIMIT, 
Long.toString(newLimit));
+            }
+            return this;
+        }
+
+        public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
+            readOptions.put(
+                    FlinkReadOptions.INCLUDE_COLUMN_STATS, 
Boolean.toString(newIncludeColumnStats));
+            return this;
+        }
+
+        public Builder<T> planParallelism(int planParallelism) {
+            readOptions.put(
+                    
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(),
+                    Integer.toString(planParallelism));
+            return this;
+        }
+
+        public Builder<T> exposeLocality(boolean newExposeLocality) {
+            this.exposeLocality = newExposeLocality;
+            return this;
+        }
+
+        public Builder<T> maxAllowedPlanningFailures(int 
maxAllowedPlanningFailures) {
+            readOptions.put(
+                    
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+                    Integer.toString(maxAllowedPlanningFailures));
+            return this;
+        }
+
+        /**
+         * Set the read properties for Flink source. View the supported 
properties in {@link
+         * FlinkReadOptions}
+         */
+        public Builder<T> set(String property, String value) {
+            readOptions.put(property, value);
+            return this;
+        }
+
+        /**
+         * Set the read properties for Flink source. View the supported 
properties in {@link
+         * FlinkReadOptions}
+         */
+        public Builder<T> setAll(Map<String, String> properties) {
+            readOptions.putAll(properties);
+            return this;
+        }
+
+        /** @deprecated Use {@link #setAll} instead. */
+        @Deprecated
+        public Builder<T> properties(Map<String, String> properties) {
+            readOptions.putAll(properties);
+            return this;
+        }
+
+        public IcebergSource<T> build() {
+            if (table == null) {
+                try (TableLoader loader = tableLoader) {
+                    loader.open();
+                    this.table = tableLoader.loadTable();
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
+
+            if (metadataConverters == null) {
+                metadataConverters = new MetadataConverter[0];
+            }
+
+            contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+
+            Schema icebergSchema = table.schema();
+            if (projectedFlinkSchema != null) {
+                contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, 
projectedFlinkSchema));
+            }
+
+            ScanContext context = contextBuilder.build();
+            if (readerFunction == null) {
+                if (table instanceof BaseMetadataTable) {
+                    MetaDataReaderFunction rowDataReaderFunction =
+                            new MetaDataReaderFunction(
+                                    flinkConfig, table.schema(), 
context.project(),
+                                    table.io(), table.encryption(), 
metadataConverters);
+                    this.readerFunction = (ReaderFunction<T>) 
rowDataReaderFunction;
+                } else {
+                    RowDataReaderFunction rowDataReaderFunction =
+                            new RowDataReaderFunction(
+                                    flinkConfig,
+                                    table.schema(),
+                                    context.project(),
+                                    context.nameMapping(),
+                                    context.caseSensitive(),
+                                    table.io(),
+                                    table.encryption(),
+                                    context.filters(),
+                                    metadataConverters);
+                    this.readerFunction = (ReaderFunction<T>) 
rowDataReaderFunction;
+                }
+            }
+            resolveMetricOption();
+            checkRequired();
+            // Since builder already load the table, pass it to the source to 
avoid double loading
+            return new IcebergSource<T>(
+                    tableLoader, context, readerFunction, 
splitAssignerFactory, table, metricOption);
+        }
+
+        private void checkRequired() {
+            Preconditions.checkNotNull(tableLoader, "tableLoader is 
required.");
+            Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory 
is required.");
+            Preconditions.checkNotNull(readerFunction, "readerFunction is 
required.");
+            Preconditions.checkNotNull(inlongAuditAddress, "inlongAuditAddress 
is required.");
+            Preconditions.checkNotNull(inlongAuditKeys, "inlongAuditKeys is 
required.");
+            Preconditions.checkNotNull(inlongMetrics, "inlongMetrics is 
required.");
+            Preconditions.checkNotNull(metricOption, "metricOption is 
required.");
+        }
+
+        private void resolveMetricOption() {
+            if (metricOption != null) {
+                LOG.info("metric option is not null, no need to init it");
+                return;
+            }
+
+            if (StringUtils.isNullOrWhitespaceOnly(inlongAuditAddress)) {
+                inlongAuditAddress = readOptions.get(INLONG_AUDIT.key());
+            }
+            if (StringUtils.isNullOrWhitespaceOnly(inlongAuditKeys)) {
+                inlongAuditKeys = readOptions.get(AUDIT_KEYS.key());
+            }
+            if (StringUtils.isNullOrWhitespaceOnly(inlongMetrics)) {
+                inlongMetrics = readOptions.get(INLONG_METRIC.key());
+            }
+            LOG.info("start to init metric option with audit={}, keys={}, 
metric={}",
+                    inlongAuditAddress, inlongAuditKeys, inlongMetrics);
+            this.metricOption = MetricOption.builder()
+                    .withInlongLabels(inlongMetrics)
+                    .withAuditAddress(inlongAuditAddress)
+                    .withAuditKeys(inlongAuditKeys)
+                    .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                    .build();
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
new file mode 100644
index 0000000000..f55f63fadd
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source;
+
+import org.apache.inlong.sort.iceberg.IcebergReadableMetadata;
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.FlinkSource;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Flink Iceberg table source.
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class IcebergTableSource
+        implements
+            ScanTableSource,
+            SupportsProjectionPushDown,
+            SupportsFilterPushDown,
+            SupportsLimitPushDown,
+            SupportsReadingMetadata {
+
+    private int[] projectedFields;
+    private Long limit;
+    private List<Expression> filters;
+    protected DataType producedDataType;
+    protected List<String> metadataKeys;
+
+    private final TableLoader loader;
+    private final TableSchema schema;
+    private final Map<String, String> properties;
+    private final boolean isLimitPushDown;
+    private final ReadableConfig readableConfig;
+
+    private IcebergTableSource(IcebergTableSource toCopy) {
+        this.loader = toCopy.loader;
+        this.schema = toCopy.schema;
+        this.properties = toCopy.properties;
+        this.projectedFields = toCopy.projectedFields;
+        this.isLimitPushDown = toCopy.isLimitPushDown;
+        this.limit = toCopy.limit;
+        this.filters = toCopy.filters;
+        this.readableConfig = toCopy.readableConfig;
+        this.producedDataType = toCopy.producedDataType;
+        this.metadataKeys = toCopy.metadataKeys;
+    }
+
+    public IcebergTableSource(
+            TableLoader loader,
+            TableSchema schema,
+            Map<String, String> properties,
+            ReadableConfig readableConfig) {
+        this(loader, schema, properties, null, false, null, 
ImmutableList.of(), readableConfig);
+    }
+
+    private IcebergTableSource(
+            TableLoader loader,
+            TableSchema schema,
+            Map<String, String> properties,
+            int[] projectedFields,
+            boolean isLimitPushDown,
+            Long limit,
+            List<Expression> filters,
+            ReadableConfig readableConfig) {
+        this.loader = loader;
+        this.schema = schema;
+        this.properties = properties;
+        this.projectedFields = projectedFields;
+        this.isLimitPushDown = isLimitPushDown;
+        this.limit = limit;
+        this.filters = filters;
+        this.readableConfig = readableConfig;
+        this.producedDataType = schema.toPhysicalRowDataType();
+        this.metadataKeys = new ArrayList<>();
+    }
+
+    @Override
+    public void applyProjection(int[][] projectFields) {
+        this.projectedFields = new int[projectFields.length];
+        for (int i = 0; i < projectFields.length; i++) {
+            Preconditions.checkArgument(
+                    projectFields[i].length == 1, "Don't support nested 
projection in iceberg source now.");
+            this.projectedFields[i] = projectFields[i][0];
+        }
+    }
+
+    private DataStreamSource<RowData> createFLIP27Stream(
+            StreamExecutionEnvironment env,
+            TypeInformation<RowData> typeInfo) {
+        SplitAssignerType assignerType =
+                
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
+        MetadataConverter[] converters = getMetadataConverters();
+        IcebergSource<RowData> source =
+                IcebergSource.forRowData()
+                        .tableLoader(loader)
+                        .assignerFactory(assignerType.factory())
+                        .properties(properties)
+                        .project(getProjectedSchema())
+                        .limit(limit)
+                        .filters(filters)
+                        .flinkConfig(readableConfig)
+                        .metadataConverters(converters)
+                        .build();
+        DataStreamSource<RowData> stream =
+                env.fromSource(
+                        source,
+                        WatermarkStrategy.noWatermarks(),
+                        source.name(),
+                        typeInfo);
+        return stream;
+    }
+
+    private TableSchema getProjectedSchema() {
+        if (projectedFields == null) {
+            return schema;
+        } else {
+            String[] fullNames = schema.getFieldNames();
+            DataType[] fullTypes = schema.getFieldDataTypes();
+            return TableSchema.builder()
+                    .fields(
+                            Arrays.stream(projectedFields).mapToObj(i -> 
fullNames[i]).toArray(String[]::new),
+                            Arrays.stream(projectedFields).mapToObj(i -> 
fullTypes[i]).toArray(DataType[]::new))
+                    .build();
+        }
+    }
+
+    @Override
+    public void applyLimit(long newLimit) {
+        this.limit = newLimit;
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+        List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+        List<Expression> expressions = Lists.newArrayList();
+
+        for (ResolvedExpression resolvedExpression : flinkFilters) {
+            Optional<Expression> icebergExpression = 
FlinkFilters.convert(resolvedExpression);
+            if (icebergExpression.isPresent()) {
+                expressions.add(icebergExpression.get());
+                acceptedFilters.add(resolvedExpression);
+            }
+        }
+
+        this.filters = expressions;
+        return Result.of(acceptedFilters, flinkFilters);
+    }
+
+    @Override
+    public boolean supportsNestedProjection() {
+        // TODO: support nested projection
+        return false;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+
+        final TypeInformation<RowData> typeInfo =
+                runtimeProviderContext.createTypeInformation(producedDataType);
+        return new DataStreamScanProvider() {
+
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, 
StreamExecutionEnvironment execEnv) {
+                return createFLIP27Stream(execEnv, typeInfo);
+            }
+
+            @Override
+            public boolean isBounded() {
+                return FlinkSource.isBounded(properties);
+            }
+        };
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        return new IcebergTableSource(this);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Iceberg table source";
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        return Stream.of(IcebergReadableMetadata.values())
+                .collect(
+                        Collectors.toMap(
+                                IcebergReadableMetadata::getKey, 
IcebergReadableMetadata::getDataType));
+
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    private MetadataConverter[] getMetadataConverters() {
+        if (CollectionUtils.isEmpty(metadataKeys)) {
+            return new MetadataConverter[0];
+        }
+        return metadataKeys.stream()
+                .map(
+                        key -> Stream.of(IcebergReadableMetadata.values())
+                                .filter(m -> m.getKey().equals(key))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                .map(IcebergReadableMetadata::getConverter)
+                .toArray(MetadataConverter[]::new);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
new file mode 100644
index 0000000000..9169b48aaf
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * {@link RecordsWithSplitIds} is used to pass a batch of records from fetcher 
to source reader.
+ * Batching is to improve the efficiency for records handover.
+ *
+ * <p>{@link RecordsWithSplitIds} interface can encapsulate batches from 
multiple splits. This is
+ * the case for Kafka source where fetchers can retrieve records from multiple 
Kafka partitions at
+ * the same time.
+ *
+ * <p>For file-based sources like Iceberg, readers always read one split/file 
at a time. Hence, we
+ * will only have a batch of records for one split here.
+ *
+ * <p>This class uses array to store a batch of records from the same file 
(with the same
+ * fileOffset).
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class ArrayBatchRecords<T> implements 
RecordsWithSplitIds<RecordAndPosition<T>> {
+
+    @Nullable
+    private String splitId;
+    @Nullable
+    private final Pool.Recycler<T[]> recycler;
+    @Nullable
+    private final T[] records;
+    private final int numberOfRecords;
+    private final Set<String> finishedSplits;
+    private final RecordAndPosition<T> recordAndPosition;
+
+    // point to current read position within the records array
+    private int position;
+
+    private ArrayBatchRecords(
+            @Nullable String splitId,
+            @Nullable Pool.Recycler<T[]> recycler,
+            @Nullable T[] records,
+            int numberOfRecords,
+            int fileOffset,
+            long startingRecordOffset,
+            Set<String> finishedSplits) {
+        Preconditions.checkArgument(numberOfRecords >= 0, "numberOfRecords 
can't be negative");
+        Preconditions.checkArgument(fileOffset >= 0, "fileOffset can't be 
negative");
+        Preconditions.checkArgument(startingRecordOffset >= 0, 
"numberOfRecords can't be negative");
+
+        this.splitId = splitId;
+        this.recycler = recycler;
+        this.records = records;
+        this.numberOfRecords = numberOfRecords;
+        this.finishedSplits =
+                Preconditions.checkNotNull(finishedSplits, "finishedSplits can 
be empty but not null");
+        this.recordAndPosition = new RecordAndPosition<>();
+
+        recordAndPosition.set(null, fileOffset, startingRecordOffset);
+        this.position = 0;
+    }
+
+    @Nullable
+    @Override
+    public String nextSplit() {
+        String nextSplit = this.splitId;
+        // set the splitId to null to indicate no more splits
+        // this class only contains record for one split
+        this.splitId = null;
+        return nextSplit;
+    }
+
+    @Nullable
+    @Override
+    public RecordAndPosition<T> nextRecordFromSplit() {
+        if (position < numberOfRecords) {
+            recordAndPosition.record(records[position]);
+            position++;
+            return recordAndPosition;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * This method is called when all records from this batch has been 
emitted. If recycler is set, it
+     * should be called to return the records array back to pool.
+     */
+    @Override
+    public void recycle() {
+        if (recycler != null) {
+            recycler.recycle(records);
+        }
+    }
+
+    @Override
+    public Set<String> finishedSplits() {
+        return finishedSplits;
+    }
+
+    @VisibleForTesting
+    T[] records() {
+        return records;
+    }
+
+    @VisibleForTesting
+    int numberOfRecords() {
+        return numberOfRecords;
+    }
+
+    /**
+     * Create a ArrayBatchRecords backed up an array with records from the 
same file
+     *
+     * @param splitId Iceberg source only read from one split a time. We never 
have multiple records
+     *     from multiple splits.
+     * @param recycler Because {@link DataIterator} with {@link RowData} 
returns an iterator of reused
+     *     RowData object, we need to clone RowData eagerly when constructing 
a batch of records. We
+     *     can use object pool to reuse the RowData array object which can be 
expensive to create.
+     *     This recycler can be provided to recycle the array object back to 
pool after read is
+     *     exhausted. If the {@link DataIterator} returns an iterator of 
non-reused objects, we don't
+     *     need to clone objects. It is cheap to just create the batch array. 
Hence, we don't need
+     *     object pool and recycler can be set to null.
+     * @param records an array (maybe reused) holding a batch of records
+     * @param numberOfRecords actual number of records in the array
+     * @param fileOffset fileOffset for all records in this batch
+     * @param startingRecordOffset starting recordOffset
+     * @param <T> record type
+     */
+    public static <T> ArrayBatchRecords<T> forRecords(
+            String splitId,
+            Pool.Recycler<T[]> recycler,
+            T[] records,
+            int numberOfRecords,
+            int fileOffset,
+            long startingRecordOffset) {
+        return new ArrayBatchRecords<>(
+                splitId,
+                recycler,
+                records,
+                numberOfRecords,
+                fileOffset,
+                startingRecordOffset,
+                Collections.emptySet());
+    }
+
+    /**
+     * Create ab ArrayBatchRecords with only finished split id
+     *
+     * @param splitId for the split that is just exhausted
+     */
+    public static <T> ArrayBatchRecords<T> finishedSplit(String splitId) {
+        return new ArrayBatchRecords<>(null, null, null, 0, 0, 0, 
Collections.singleton(splitId));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
new file mode 100644
index 0000000000..2ef897b5d4
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.DataIteratorBatcher;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * This implementation stores record batch in array from recyclable pool
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+
+    private final int batchSize;
+    private final int handoverQueueSize;
+    private final RecordFactory<T> recordFactory;
+
+    private transient Pool<T[]> pool;
+
+    ArrayPoolDataIteratorBatcher(ReadableConfig config, RecordFactory<T> 
recordFactory) {
+        this.batchSize = 
config.get(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT);
+        this.handoverQueueSize = 
config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
+        this.recordFactory = recordFactory;
+    }
+
+    @Override
+    public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> batch(
+            String splitId, DataIterator<T> inputIterator) {
+        Preconditions.checkArgument(inputIterator != null, "Input data 
iterator can't be null");
+        // lazily create pool as it is not serializable
+        if (pool == null) {
+            this.pool = createPoolOfBatches(handoverQueueSize);
+        }
+        return new ArrayPoolBatchIterator(splitId, inputIterator, pool);
+    }
+
+    private Pool<T[]> createPoolOfBatches(int numBatches) {
+        Pool<T[]> poolOfBatches = new Pool<>(numBatches);
+        for (int batchId = 0; batchId < numBatches; batchId++) {
+            T[] batch = recordFactory.createBatch(batchSize);
+            poolOfBatches.add(batch);
+        }
+
+        return poolOfBatches;
+    }
+
+    private class ArrayPoolBatchIterator
+            implements
+                CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+        private final String splitId;
+        private final DataIterator<T> inputIterator;
+        private final Pool<T[]> pool;
+
+        ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator, 
Pool<T[]> pool) {
+            this.splitId = splitId;
+            this.inputIterator = inputIterator;
+            this.pool = pool;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return inputIterator.hasNext();
+        }
+
+        @Override
+        public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+            if (!inputIterator.hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            T[] batch = getCachedEntry();
+            int recordCount = 0;
+            while (inputIterator.hasNext() && recordCount < batchSize) {
+                // The record produced by inputIterator can be reused like for 
the RowData case.
+                // inputIterator.next() can't be called again until the copy 
is made
+                // since the record is not consumed immediately.
+                T nextRecord = inputIterator.next();
+                recordFactory.clone(nextRecord, batch, recordCount);
+                recordCount++;
+                if (!inputIterator.currentFileHasNext()) {
+                    // break early so that records in the ArrayResultIterator
+                    // have the same fileOffset.
+                    break;
+                }
+            }
+
+            return ArrayBatchRecords.forRecords(
+                    splitId,
+                    pool.recycler(),
+                    batch,
+                    recordCount,
+                    inputIterator.fileOffset(),
+                    inputIterator.recordOffset() - recordCount);
+        }
+
+        @Override
+        public void close() throws IOException {
+            inputIterator.close();
+        }
+
+        private T[] getCachedEntry() {
+            try {
+                return pool.pollEntry();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while waiting for 
array pool entry", e);
+            }
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
new file mode 100644
index 0000000000..beaf321e51
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class IcebergSourceReader<T>
+        extends
+            SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, 
IcebergSourceSplit, IcebergSourceSplit> {
+
+    public IcebergSourceReader(
+            InlongIcebergSourceReaderMetrics metrics,
+            ReaderFunction<T> readerFunction,
+            SourceReaderContext context) {
+        super(
+                () -> new IcebergSourceSplitReader<>(metrics, readerFunction, 
context),
+                new IcebergSourceRecordEmitter<>(),
+                context.getConfiguration(),
+                context);
+    }
+
+    @Override
+    public void start() {
+        // We request a split only if we did not get splits during the 
checkpoint restore.
+        // Otherwise, reader restarts will keep requesting more and more 
splits.
+        if (getNumberOfCurrentlyAssignedSplits() == 0) {
+            requestSplit(Collections.emptyList());
+        }
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, IcebergSourceSplit> 
finishedSplitIds) {
+        requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
+    }
+
+    @Override
+    protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
+        return split;
+    }
+
+    @Override
+    protected IcebergSourceSplit toSplitType(String splitId, 
IcebergSourceSplit splitState) {
+        return splitState;
+    }
+
+    private void requestSplit(Collection<String> finishedSplitIds) {
+        context.sendSourceEventToCoordinator(new 
SplitRequestEvent(finishedSplitIds));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
similarity index 50%
copy from 
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
copy to 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
index 0826eb2a9e..0a928069e0 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
@@ -15,33 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.base.util;
+package org.apache.inlong.sort.iceberg.source.reader;
 
-import org.apache.flink.table.data.binary.BinaryRowData;
-
-import java.nio.charset.StandardCharsets;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 
 /**
- * calculate tool for object
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
  */
-public class CalculateObjectSizeUtils {
+final class IcebergSourceRecordEmitter<T>
+        implements
+            RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
 
-    /**
-     * {@link BinaryRowData} don't implement the {@link Object#toString} method
-     * So, we need use {@link BinaryRowData#getSizeInBytes} to get byte size.
-     */
-    public static long getDataSize(Object object) {
-        if (object == null) {
-            return 0L;
-        }
-        long size;
-        if (object instanceof BinaryRowData) {
-            BinaryRowData binaryRowData = (BinaryRowData) object;
-            size = binaryRowData.getSizeInBytes();
-        } else {
-            size = object.toString().getBytes(StandardCharsets.UTF_8).length;
-        }
-        return size;
+    IcebergSourceRecordEmitter() {
     }
 
+    @Override
+    public void emitRecord(
+            RecordAndPosition<T> element, SourceOutput<T> output, 
IcebergSourceSplit split) {
+        output.collect(element.record());
+        split.updatePosition(element.fileOffset(), element.recordOffset());
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
new file mode 100644
index 0000000000..146c2dad7e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Queue;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, 
IcebergSourceSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSourceSplitReader.class);
+
+    private final InlongIcebergSourceReaderMetrics metrics;
+    private final ReaderFunction<T> openSplitFunction;
+    private final int indexOfSubtask;
+    private final Queue<IcebergSourceSplit> splits;
+
+    private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> 
currentReader;
+    private IcebergSourceSplit currentSplit;
+    private String currentSplitId;
+
+    IcebergSourceSplitReader(
+            InlongIcebergSourceReaderMetrics metrics,
+            ReaderFunction<T> openSplitFunction,
+            SourceReaderContext context) {
+        this.metrics = metrics;
+        this.openSplitFunction = openSplitFunction;
+        this.indexOfSubtask = context.getIndexOfSubtask();
+        this.splits = new ArrayDeque<>();
+    }
+
+    @Override
+    public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws 
IOException {
+        metrics.incrementSplitReaderFetchCalls(1);
+        if (currentReader == null) {
+            IcebergSourceSplit nextSplit = splits.poll();
+            if (nextSplit != null) {
+                currentSplit = nextSplit;
+                currentSplitId = nextSplit.splitId();
+                currentReader = openSplitFunction.apply(currentSplit);
+            } else {
+                // return an empty result, which will lead to split fetch to 
be idle.
+                // SplitFetcherManager will then close idle fetcher.
+                return new RecordsBySplits(Collections.emptyMap(), 
Collections.emptySet());
+            }
+        }
+
+        if (currentReader.hasNext()) {
+            // Because Iterator#next() doesn't support checked exception,
+            // we need to wrap and unwrap the checked IOException with 
UncheckedIOException
+            try {
+                ArrayBatchRecords<T> result = (ArrayBatchRecords<T>) 
currentReader.next();
+                metrics.outputMetricsWithEstimate(result);
+                return result;
+            } catch (UncheckedIOException e) {
+                throw e.getCause();
+            }
+        } else {
+            return finishSplit();
+        }
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> 
splitsChange) {
+        if (!(splitsChange instanceof SplitsAddition)) {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported split change: %s", 
splitsChange.getClass()));
+        }
+
+        LOG.info("Add {} splits to reader", splitsChange.splits().size());
+        splits.addAll(splitsChange.splits());
+        metrics.incrementAssignedSplits(splitsChange.splits().size());
+        metrics.incrementAssignedBytes(calculateBytes(splitsChange));
+    }
+
+    @Override
+    public void wakeUp() {
+    }
+
+    @Override
+    public void close() throws Exception {
+        currentSplitId = null;
+        if (currentReader != null) {
+            currentReader.close();
+        }
+    }
+
+    private long calculateBytes(IcebergSourceSplit split) {
+        return 
split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum);
+    }
+
+    private long calculateBytes(SplitsChange<IcebergSourceSplit> 
splitsChanges) {
+        return 
splitsChanges.splits().stream().map(this::calculateBytes).reduce(0L, Long::sum);
+    }
+
+    private ArrayBatchRecords<T> finishSplit() throws IOException {
+        if (currentReader != null) {
+            currentReader.close();
+            currentReader = null;
+        }
+
+        ArrayBatchRecords<T> finishRecords = 
ArrayBatchRecords.finishedSplit(currentSplitId);
+        LOG.info("Split reader {} finished split: {}", indexOfSubtask, 
currentSplitId);
+        metrics.incrementFinishedSplits(1);
+        metrics.incrementFinishedBytes(calculateBytes(currentSplit));
+        currentSplitId = null;
+        return finishRecords;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
new file mode 100644
index 0000000000..e7afec2551
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
+
+/**
+ * Inlong iceberg source reader metrics
+ */
+@Slf4j
+public class InlongIcebergSourceReaderMetrics extends 
IcebergSourceReaderMetrics {
+
+    private final MetricGroup metrics;
+    private SourceMetricData sourceMetricData;
+
+    public InlongIcebergSourceReaderMetrics(MetricGroup metrics, String 
fullTableName) {
+        super(metrics, fullTableName);
+        this.metrics = metrics;
+    }
+
+    public void registerMetrics(MetricOption metricOption) {
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption, metrics);
+        } else {
+            log.warn("failed to init sourceMetricData since the metricOption 
is null");
+        }
+    }
+
+    public void outputMetricsWithEstimate(ArrayBatchRecords batchRecord) {
+        sourceMetricData.outputMetricsWithEstimate(batchRecord.records());
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
new file mode 100644
index 0000000000..d7a5df351f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.DataTaskReader;
+import org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Reading metadata tables (like snapshots, manifests, etc.)
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class MetaDataReaderFunction extends 
DataIteratorReaderFunction<RowData> {
+
+    private final Schema readSchema;
+    private final FileIO io;
+    private final EncryptionManager encryption;
+
+    public MetaDataReaderFunction(
+            ReadableConfig config,
+            Schema tableSchema,
+            Schema projectedSchema,
+            FileIO io,
+            EncryptionManager encryption,
+            MetadataConverter[] metadataConverters) {
+        super(
+                new ArrayPoolDataIteratorBatcher<>(
+                        config,
+                        new RowDataRecordFactory(
+                                
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)),
+                                metadataConverters)));
+        this.readSchema = readSchema(tableSchema, projectedSchema);
+        this.io = io;
+        this.encryption = encryption;
+    }
+
+    @Override
+    public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
+        return new DataIterator<>(new DataTaskReader(readSchema), 
split.task(), io, encryption);
+    }
+
+    private static Schema readSchema(Schema tableSchema, Schema 
projectedSchema) {
+        Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
+        return projectedSchema == null ? tableSchema : projectedSchema;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
new file mode 100644
index 0000000000..1afa412042
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import java.io.Serializable;
+
+/**
+ * In FLIP-27 source, SplitReader#fetch() returns a batch of records. Since 
DataIterator for RowData
+ * returns an iterator of reused RowData objects, RecordFactory is needed to 
(1) create object array
+ * that is recyclable via pool. (2) clone RowData element from DataIterator to 
the batch array.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+interface RecordFactory<T> extends Serializable {
+
+    /** Create a batch of records */
+    T[] createBatch(int batchSize);
+
+    /** Clone record into the specified position of the batch array */
+    void clone(T from, T[] batch, int position);
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
new file mode 100644
index 0000000000..534cb6f7ac
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> 
{
+
+    private final Schema tableSchema;
+    private final Schema readSchema;
+    private final String nameMapping;
+    private final boolean caseSensitive;
+    private final FileIO io;
+    private final EncryptionManager encryption;
+    private final List<Expression> filters;
+
+    public RowDataReaderFunction(
+            ReadableConfig config,
+            Schema tableSchema,
+            Schema projectedSchema,
+            String nameMapping,
+            boolean caseSensitive,
+            FileIO io,
+            EncryptionManager encryption,
+            List<Expression> filters,
+            MetadataConverter[] metadataConverters) {
+        super(
+                new ArrayPoolDataIteratorBatcher<>(
+                        config,
+                        new RowDataRecordFactory(
+                                
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)),
+                                metadataConverters)));
+        this.tableSchema = tableSchema;
+        this.readSchema = readSchema(tableSchema, projectedSchema);
+        this.nameMapping = nameMapping;
+        this.caseSensitive = caseSensitive;
+        this.io = io;
+        this.encryption = encryption;
+        this.filters = filters;
+    }
+
+    @Override
+    public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
+        return new DataIterator<>(
+                new RowDataFileScanTaskReader(tableSchema, readSchema, 
nameMapping, caseSensitive, filters),
+                split.task(),
+                io,
+                encryption);
+    }
+
+    private static Schema readSchema(Schema tableSchema, Schema 
projectedSchema) {
+        Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
+        return projectedSchema == null ? tableSchema : projectedSchema;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
new file mode 100644
index 0000000000..80110cf014
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData;
+import org.apache.inlong.sort.iceberg.source.utils.RowDataCloneUtil;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class RowDataRecordFactory implements RecordFactory<RowData> {
+
+    private final RowType rowType;
+    private final TypeSerializer[] fieldSerializers;
+    private final MetadataConverter[] metadataConverters;
+
+    RowDataRecordFactory(RowType rowType, MetadataConverter[] 
metadataConverters) {
+        this.rowType = rowType;
+        this.fieldSerializers = createFieldSerializers(rowType);
+        this.metadataConverters = metadataConverters;
+    }
+
+    static TypeSerializer[] createFieldSerializers(RowType rowType) {
+        return rowType.getChildren().stream()
+                .map(InternalSerializers::create)
+                .toArray(TypeSerializer[]::new);
+    }
+
+    @Override
+    public RowData[] createBatch(int batchSize) {
+        RowData[] arr = new RowData[batchSize];
+        for (int i = 0; i < batchSize; ++i) {
+            arr[i] = new RecyclableJoinedRowData(rowType.getFieldCount(), 
metadataConverters.length);
+        }
+        return arr;
+    }
+
+    @Override
+    public void clone(RowData from, RowData[] batch, int position) {
+
+        RecyclableJoinedRowData recyclable;
+        if (batch[position] instanceof RecyclableJoinedRowData) {
+            recyclable = (RecyclableJoinedRowData) batch[position];
+        } else {
+            recyclable = new RecyclableJoinedRowData(rowType.getFieldCount(), 
metadataConverters.length);
+        }
+
+        RowData physical =
+                RowDataCloneUtil.clonePhysical(from, 
recyclable.getPhysicalRowData(), rowType, fieldSerializers);
+        RowData meta = RowDataCloneUtil.cloneMeta(from, 
recyclable.getMetaRowData(), metadataConverters);
+        batch[position] = recyclable.replace(physical.getRowKind(), physical, 
meta);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
new file mode 100644
index 0000000000..28327d2d8d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+/**
+ * JoinedRowData that support recycle two member RowData by provide get() and 
set() interfaces.
+ */
+@PublicEvolving
+public class RecyclableJoinedRowData implements RowData {
+
+    private RowKind rowKind = RowKind.INSERT;
+    private RowData physicalRowData;
+    private RowData metaRowData;
+
+    public RecyclableJoinedRowData() {
+    }
+
+    public RecyclableJoinedRowData(int physicalSize, int metaSize) {
+        physicalRowData = new GenericRowData(physicalSize);
+        metaRowData = new GenericRowData(metaSize);
+    }
+
+    public RecyclableJoinedRowData replace(RowKind rowKind, RowData 
physicalRowData, RowData metaRowData) {
+        this.rowKind = rowKind;
+        this.physicalRowData = physicalRowData;
+        this.metaRowData = metaRowData;
+        return this;
+    }
+
+    public RowData getPhysicalRowData() {
+        return physicalRowData;
+    }
+
+    public RowData getMetaRowData() {
+        return metaRowData;
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    @Override
+    public int getArity() {
+        return physicalRowData.getArity() + metaRowData.getArity();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return rowKind;
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        this.rowKind = kind;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.isNullAt(pos);
+        } else {
+            return metaRowData.isNullAt(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getBoolean(pos);
+        } else {
+            return metaRowData.getBoolean(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getByte(pos);
+        } else {
+            return metaRowData.getByte(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public short getShort(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getShort(pos);
+        } else {
+            return metaRowData.getShort(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public int getInt(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getInt(pos);
+        } else {
+            return metaRowData.getInt(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public long getLong(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getLong(pos);
+        } else {
+            return metaRowData.getLong(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getFloat(pos);
+        } else {
+            return metaRowData.getFloat(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getDouble(pos);
+        } else {
+            return metaRowData.getDouble(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getString(pos);
+        } else {
+            return metaRowData.getString(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getDecimal(pos, precision, scale);
+        } else {
+            return metaRowData.getDecimal(pos - physicalRowData.getArity(), 
precision, scale);
+        }
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getTimestamp(pos, precision);
+        } else {
+            return metaRowData.getTimestamp(pos - physicalRowData.getArity(), 
precision);
+        }
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getRawValue(pos);
+        } else {
+            return metaRowData.getRawValue(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getBinary(pos);
+        } else {
+            return metaRowData.getBinary(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getArray(pos);
+        } else {
+            return metaRowData.getArray(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getMap(pos);
+        } else {
+            return metaRowData.getMap(pos - physicalRowData.getArity());
+        }
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        if (pos < physicalRowData.getArity()) {
+            return physicalRowData.getRow(pos, numFields);
+        } else {
+            return metaRowData.getRow(pos - physicalRowData.getArity(), 
numFields);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RecyclableJoinedRowData that = (RecyclableJoinedRowData) o;
+        return Objects.equals(rowKind, that.rowKind)
+                && Objects.equals(this.physicalRowData, that.physicalRowData)
+                && Objects.equals(this.metaRowData, that.metaRowData);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowKind, physicalRowData, metaRowData);
+    }
+
+    @Override
+    public String toString() {
+        return rowKind.shortString() + "{" + "physicalRowData=" + 
physicalRowData + ", metaRowData=" + metaRowData
+                + '}';
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
new file mode 100644
index 0000000000..58e541f0f1
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.utils;
+
+import 
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Support clone meta and physical RowData.
+ */
+public class RowDataCloneUtil {
+
+    private RowDataCloneUtil() {
+    }
+
+    public static RowData cloneMeta(
+            RowData from, RowData reuse, MetadataConverter[] converters) {
+        GenericRowData ret;
+        if (reuse instanceof GenericRowData) {
+            ret = (GenericRowData) reuse;
+        } else {
+            ret = new GenericRowData(from.getArity());
+        }
+
+        for (int i = 0; i < converters.length; i++) {
+            Object meta = converters[i].read(from);
+            ret.setField(i, meta);
+        }
+
+        return ret;
+    }
+
+    public static RowData clonePhysical(
+            RowData from, RowData reuse, RowType rowType, TypeSerializer[] 
fieldSerializers) {
+        GenericRowData ret;
+        if (reuse instanceof GenericRowData) {
+            ret = (GenericRowData) reuse;
+        } else {
+            ret = new GenericRowData(from.getArity());
+        }
+        ret.setRowKind(from.getRowKind());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            if (!from.isNullAt(i)) {
+                RowData.FieldGetter getter = 
RowData.createFieldGetter(rowType.getTypeAt(i), i);
+                ret.setField(i, 
fieldSerializers[i].copy(getter.getFieldOrNull(from)));
+            } else {
+                ret.setField(i, null);
+            }
+        }
+        return ret;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
index ddbf4ebba4..2c5f0dd56f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>flink-table-common</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index d9f7cf28f3..e83548b61e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -768,6 +768,16 @@
          
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
          
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
          
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayBatchRecords.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayPoolDataIteratorBatcher.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceReader.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceRecordEmitter.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceSplitReader.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/InlongIcebergSourceReaderMetrics.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/MetaDataReaderFunction.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RecordFactory.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataReaderFunction.java
+         
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataRecordFactory.java
 
  Source  : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the 
software have been modified.)
  License : https://github.com/apache/iceberg/LICENSE
diff --git a/pom.xml b/pom.xml
index 576f0c73e6..d30034e7a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -205,6 +205,7 @@
         <otel.version>1.28.0</otel.version>
         <tencentcloud-api.version>3.1.830</tencentcloud-api.version>
         <woodstox-core.version>5.4.0</woodstox-core.version>
+        <libfb303.version>0.9.3</libfb303.version>
     </properties>
 
     <dependencyManagement>

Reply via email to