This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dc12a237da [INLONG-8838][Sort] IcebergSource support metadata (#8895)
dc12a237da is described below
commit dc12a237da7ef791b4e76d3247014b8f4c5d9235
Author: vernedeng <[email protected]>
AuthorDate: Mon Sep 18 11:59:16 2023 +0800
[INLONG-8838][Sort] IcebergSource support metadata (#8895)
---
inlong-manager/manager-service/pom.xml | 58 +--
.../resource/sink/hudi/HudiCatalogClient.java | 53 +-
.../manager/service/sort/SortServiceImplTest.java | 11 +-
.../org/apache/inlong/sort/base/Constants.java | 2 +
.../inlong/sort/base/metric/SourceMetricData.java | 6 +
.../sort/base/util/CalculateObjectSizeUtils.java | 7 +
.../sort-connectors/iceberg/pom.xml | 34 +-
.../sort/iceberg/FlinkDynamicTableFactory.java | 3 +-
.../sort/iceberg/IcebergReadableMetadata.java | 65 +++
.../inlong/sort/iceberg/source/IcebergSource.java | 565 +++++++++++++++++++++
.../sort/iceberg/source/IcebergTableSource.java | 270 ++++++++++
.../iceberg/source/reader/ArrayBatchRecords.java | 179 +++++++
.../reader/ArrayPoolDataIteratorBatcher.java | 138 +++++
.../iceberg/source/reader/IcebergSourceReader.java | 79 +++
.../source/reader/IcebergSourceRecordEmitter.java} | 38 +-
.../source/reader/IcebergSourceSplitReader.java | 143 ++++++
.../reader/InlongIcebergSourceReaderMetrics.java | 53 ++
.../source/reader/MetaDataReaderFunction.java | 74 +++
.../sort/iceberg/source/reader/RecordFactory.java | 36 ++
.../source/reader/RowDataReaderFunction.java | 88 ++++
.../source/reader/RowDataRecordFactory.java | 74 +++
.../source/utils/RecyclableJoinedRowData.java | 251 +++++++++
.../iceberg/source/utils/RowDataCloneUtil.java | 71 +++
.../sort-flink-dependencies/pom.xml | 5 +
licenses/inlong-sort-connectors/LICENSE | 10 +
pom.xml | 1 +
26 files changed, 2220 insertions(+), 94 deletions(-)
diff --git a/inlong-manager/manager-service/pom.xml
b/inlong-manager/manager-service/pom.xml
index fe32b68874..d6f1aff1d4 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -252,63 +252,9 @@
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.calcite</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.calcite.avatica</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.curator</groupId>
- <artifactId>apache-curator</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-registry</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-llap-tez</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-vector-code-gen</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.ivy</groupId>
- <artifactId>ivy</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.groovy</groupId>
- <artifactId>groovy-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.antlr</groupId>
- <artifactId>ST4</artifactId>
- </exclusion>
- <exclusion>
- <groupId>stax</groupId>
- <artifactId>stax-api</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>hive-standalone-metastore</artifactId>
+ <version>${hive3x.version}</version>
</dependency>
-
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 972fe52a9b..defa95f8c7 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -23,21 +23,29 @@ import
org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -81,7 +89,7 @@ public class HudiCatalogClient implements AutoCloseable {
public void open() {
if (this.client == null) {
try {
- this.client = Hive.get(hiveConf).getMSC();
+ this.client = new HiveMetaStoreClient(hiveConf);
} catch (Exception e) {
throw new RuntimeException("Failed to create hive metastore
client", e);
}
@@ -188,7 +196,7 @@ public class HudiCatalogClient implements AutoCloseable {
HudiTableInfo tableInfo,
boolean useRealTimeInputFormat)
throws TException, IOException {
- Table hiveTable =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName);
+ Table hiveTable = this.getEmptyTable(dbName, tableName);
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
@@ -250,4 +258,43 @@ public class HudiCatalogClient implements AutoCloseable {
}
}
+ public Table getEmptyTable(String databaseName, String tableName) {
+ StorageDescriptor sd = new StorageDescriptor();
+ {
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.setNumBuckets(-1);
+ sd.setBucketCols(new ArrayList<String>());
+ sd.setCols(new ArrayList<FieldSchema>());
+ sd.setParameters(new HashMap<String, String>());
+ sd.setSortCols(new ArrayList<Order>());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ // We have to use MetadataTypedColumnsetSerDe because
LazySimpleSerDe does
+ // not support a table with no columns.
+
sd.getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+
sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
+ sd.setInputFormat(SequenceFileInputFormat.class.getName());
+ SkewedInfo skewInfo = new SkewedInfo();
+ skewInfo.setSkewedColNames(new ArrayList<String>());
+ skewInfo.setSkewedColValues(new ArrayList<List<String>>());
+ skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>,
String>());
+ sd.setSkewedInfo(skewInfo);
+ }
+
+ org.apache.hadoop.hive.metastore.api.Table t = new
org.apache.hadoop.hive.metastore.api.Table();
+ {
+ t.setSd(sd);
+ t.setPartitionKeys(new ArrayList<FieldSchema>());
+ t.setParameters(new HashMap<String, String>());
+ t.setTableType(TableType.MANAGED_TABLE.toString());
+ t.setDbName(databaseName);
+ t.setTableName(tableName);
+ // set create time
+ t.setCreateTime((int) (System.currentTimeMillis() / 1000));
+ }
+ // Explictly set the bucketing version
+ t.getParameters().put(hive_metastoreConstants.TABLE_BUCKETING_VERSION,
+ "2");
+ return t;
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 993ffd5734..5032057db0 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -49,7 +49,6 @@ import org.apache.inlong.manager.service.node.DataNodeService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
-import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -140,9 +139,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Order(2)
@Transactional
public void testSourceCorrectParamsOfNoTagSortCluster() {
- SortSourceConfigResponse response =
sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
- JSONObject jo = new JSONObject(response);
- System.out.println(jo);
+ SortSourceConfigResponse response =
sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");;
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getData());
Assertions.assertNotNull(response.getMd5());
@@ -193,8 +190,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Transactional
public void testClusterCorrectParams() {
SortClusterResponse response =
sortService.getClusterConfig(TEST_CLUSTER_1, "");
- JSONObject jo = new JSONObject(response);
- System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getData());
Assertions.assertNotNull(response.getMd5());
@@ -232,8 +227,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Transactional
public void testSourceCorrectParamsOfTaggedSortCluster() {
SortSourceConfigResponse response =
sortService.getSourceConfig(TEST_CLUSTER_2, TEST_TASK_2, "");
- JSONObject jo = new JSONObject(response);
- System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getMd5());
Assertions.assertNotNull(response.getMsg());
@@ -248,8 +241,6 @@ public class SortServiceImplTest extends ServiceBaseTest {
Assertions.assertEquals(1, zone.getTopics().size());
response = sortService.getSourceConfig(TEST_CLUSTER_3, TEST_TASK_3,
"");
- jo = new JSONObject(response);
- System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
Assertions.assertNotNull(response.getMd5());
Assertions.assertNotNull(response.getMsg());
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c0fbed00c7..72ce9d89b8 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -167,6 +167,8 @@ public final class Constants {
public static final String META_INCREMENTAL = "incremental_inlong";
+ public static final String META_INLONG_DATA_TIME = "inlong_data_time";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index ec64f53b39..c77f688045 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -38,6 +38,7 @@ import static
org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataArraySize;
import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
/**
@@ -242,6 +243,11 @@ public class SourceMetricData implements MetricData {
outputMetrics(1, getDataSize(data));
}
+ public void outputMetricsWithEstimate(Object[] records) {
+ long size = getDataArraySize(records);
+ outputMetrics(records.length, size);
+ }
+
public void outputMetricsWithEstimate(Object data, long fetchDelay, long
emitDelay) {
outputMetrics(1, getDataSize(data));
this.fetchDelay = fetchDelay;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
index 0826eb2a9e..748522693b 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.base.util;
import org.apache.flink.table.data.binary.BinaryRowData;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
/**
* calculate tool for object
@@ -44,4 +45,10 @@ public class CalculateObjectSizeUtils {
return size;
}
+ public static long getDataArraySize(Object[] objects) {
+ return Arrays.stream(objects)
+ .mapToLong(CalculateObjectSizeUtils::getDataSize)
+ .sum();
+ }
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
index c481dd8de6..81168093ba 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/pom.xml
@@ -39,6 +39,18 @@
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libfb303</artifactId>
+ <version>${libfb303.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
@@ -62,6 +74,7 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive3x.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -79,6 +92,17 @@
<artifactId>iceberg-hive-metastore</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version.v1.15}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -87,7 +111,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${plugin.shade.version}</version>
-
<executions>
<execution>
<id>shade-flink</id>
@@ -108,7 +131,8 @@
<include>org.apache.hive:*</include>
<!-- Include fixed version 18.0-13.0 of
flink shaded guava -->
<include>org.apache.flink:flink-shaded-guava</include>
- <include>com.google.protobuf:*</include>
+
<include>org.apache.flink:flink-connector-files</include>
+
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.thrift:*</include>
<include>com.facebook.*:*</include>
</includes>
@@ -123,6 +147,12 @@
<include>META-INF/services/org.apache.flink.table.factories.TableFactory</include>
</includes>
</filter>
+ <filter>
+
<artifact>org.apache.hive:hive-exec</artifact>
+ <excludes>
+
<exclude>com/google/protobuf/**</exclude>
+ </excludes>
+ </filter>
</filters>
<relocations>
<relocation>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 1edf546c23..ef50b8d3a8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.iceberg;
+import org.apache.inlong.sort.iceberg.source.IcebergTableSource;
+
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -38,7 +40,6 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.flink.IcebergTableSink;
import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.source.IcebergTableSource;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
new file mode 100644
index 0000000000..e003555a45
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergReadableMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg;
+
+import org.apache.inlong.sort.base.Constants;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+
+/**
+ * IcebergReadableMetadata
+ */
+public enum IcebergReadableMetadata {
+
+ INLONG_DATA_TIME(
+ Constants.META_INLONG_DATA_TIME,
+ DataTypes.BIGINT().notNull(),
+ r -> System.currentTimeMillis());
+
+ private final String key;
+ private final DataType dataType;
+ private final MetadataConverter converter;
+
+ IcebergReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public MetadataConverter getConverter() {
+ return converter;
+ }
+
+ public interface MetadataConverter extends Serializable {
+
+ Object read(RowData rowData);
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
new file mode 100644
index 0000000000..47f7d04b44
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergSource.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+import org.apache.inlong.sort.iceberg.source.reader.IcebergSourceReader;
+import
org.apache.inlong.sort.iceberg.source.reader.InlongIcebergSourceReaderMetrics;
+import org.apache.inlong.sort.iceberg.source.reader.MetaDataReaderFunction;
+import org.apache.inlong.sort.iceberg.source.reader.RowDataReaderFunction;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.iceberg.BaseMetadataTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkReadOptions;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.FlinkSplitPlanner;
+import org.apache.iceberg.flink.source.ScanContext;
+import org.apache.iceberg.flink.source.StreamingStartingStrategy;
+import org.apache.iceberg.flink.source.assigner.SplitAssigner;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory;
+import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner;
+import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl;
+import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
+import
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
+import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Experimental
+public class IcebergSource<T> implements Source<T, IcebergSourceSplit,
IcebergEnumeratorState> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSource.class);
+
+ private final TableLoader tableLoader;
+ private final ScanContext scanContext;
+ private final ReaderFunction<T> readerFunction;
+ private final SplitAssignerFactory assignerFactory;
+
+ private final MetricOption metricOption;
+
+ // Can't use SerializableTable as enumerator needs a regular table
+ // that can discover table changes
+ private transient Table table;
+
+ IcebergSource(
+ TableLoader tableLoader,
+ ScanContext scanContext,
+ ReaderFunction<T> readerFunction,
+ SplitAssignerFactory assignerFactory,
+ Table table,
+ MetricOption metricOption) {
+ this.tableLoader = tableLoader;
+ this.scanContext = scanContext;
+ this.readerFunction = readerFunction;
+ this.assignerFactory = assignerFactory;
+ this.table = table;
+ this.metricOption = metricOption;
+ }
+
+ String name() {
+ return "IcebergSource-" + lazyTable().name();
+ }
+
+ private String planningThreadName() {
+ // Ideally, operatorId should be used as the threadPoolName as Flink
guarantees its uniqueness
+ // within a job. SplitEnumeratorContext doesn't expose the
OperatorCoordinator.Context, which
+ // would contain the OperatorID. Need to discuss with Flink community
whether it is ok to expose
+ // a public API like the protected method "OperatorCoordinator.Context
getCoordinatorContext()"
+ // from SourceCoordinatorContext implementation. For now, <table
name>-<random UUID> is used as
+ // the unique thread pool name.
+ return lazyTable().name() + "-" + UUID.randomUUID();
+ }
+
+ private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
+ ExecutorService workerPool =
+ ThreadPools.newWorkerPool(threadName,
scanContext.planParallelism());
+ try {
+ List<IcebergSourceSplit> splits =
+ FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(),
scanContext, workerPool);
+ LOG.info(
+ "Discovered {} splits from table {} during job
initialization",
+ splits.size(),
+ lazyTable().name());
+ return splits;
+ } finally {
+ workerPool.shutdown();
+ }
+ }
+
+ private Table lazyTable() {
+ if (table == null) {
+ tableLoader.open();
+ try (TableLoader loader = tableLoader) {
+ this.table = loader.loadTable();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close table loader",
e);
+ }
+ }
+
+ return table;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<T, IcebergSourceSplit>
createReader(SourceReaderContext readerContext) {
+ InlongIcebergSourceReaderMetrics metrics =
+ new
InlongIcebergSourceReaderMetrics(readerContext.metricGroup(),
lazyTable().name());
+ metrics.registerMetrics(metricOption);
+ return new IcebergSourceReader<>(metrics, readerFunction,
readerContext);
+ }
+
+ @Override
+ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>
createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext) {
+ return createEnumerator(enumContext, null);
+ }
+
+ @Override
+ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>
restoreEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext,
IcebergEnumeratorState enumState) {
+ return createEnumerator(enumContext, enumState);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
+ return IcebergSourceSplitSerializer.INSTANCE;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<IcebergEnumeratorState>
getEnumeratorCheckpointSerializer() {
+ return IcebergEnumeratorStateSerializer.INSTANCE;
+ }
+
+ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState>
createEnumerator(
+ SplitEnumeratorContext<IcebergSourceSplit> enumContext,
+ @Nullable IcebergEnumeratorState enumState) {
+ SplitAssigner assigner;
+ if (enumState == null) {
+ assigner = assignerFactory.createAssigner();
+ } else {
+ LOG.info(
+ "Iceberg source restored {} splits from state for table
{}",
+ enumState.pendingSplits().size(),
+ lazyTable().name());
+ assigner =
assignerFactory.createAssigner(enumState.pendingSplits());
+ }
+
+ if (scanContext.isStreaming()) {
+ ContinuousSplitPlanner splitPlanner =
+ new ContinuousSplitPlannerImpl(tableLoader.clone(),
scanContext, planningThreadName());
+ return new ContinuousIcebergEnumerator(
+ enumContext, assigner, scanContext, splitPlanner,
enumState);
+ } else {
+ List<IcebergSourceSplit> splits =
planSplitsForBatch(planningThreadName());
+ assigner.onDiscoveredSplits(splits);
+ return new StaticIcebergEnumerator(enumContext, assigner);
+ }
+ }
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ public static Builder<RowData> forRowData() {
+ return new Builder<>();
+ }
+
+ public static class Builder<T> {
+
+ private TableLoader tableLoader;
+ private Table table;
+ private SplitAssignerFactory splitAssignerFactory;
+ private ReaderFunction<T> readerFunction;
+ private ReadableConfig flinkConfig = new Configuration();
+ private final ScanContext.Builder contextBuilder =
ScanContext.builder();
+ private TableSchema projectedFlinkSchema;
+ private Boolean exposeLocality;
+ private MetadataConverter[] metadataConverters;
+ private MetricOption metricOption;
+ private String inlongAuditAddress;
+ private String inlongAuditKeys;
+ private String inlongMetrics;
+
+ private final Map<String, String> readOptions = Maps.newHashMap();
+
+ Builder() {
+ }
+
+ public Builder<T> metricOption(MetricOption metricOption) {
+ this.metricOption = metricOption;
+ return this;
+ }
+
+ public Builder<T> inlongAuditAddress(String inlongAuditAddress) {
+ this.inlongAuditAddress = inlongAuditAddress;
+ return this;
+ }
+
+ public Builder<T> inlongAuditKeys(String inlongAuditKeys) {
+ this.inlongAuditKeys = inlongAuditKeys;
+ return this;
+ }
+
+ public Builder<T> inlongMetrics(String inlongMetrics) {
+ this.inlongMetrics = inlongMetrics;
+ return this;
+ }
+
+ public Builder<T> metadataConverters(MetadataConverter[]
metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder<T> tableLoader(TableLoader loader) {
+ this.tableLoader = loader;
+ return this;
+ }
+
+ public Builder<T> table(Table newTable) {
+ this.table = newTable;
+ return this;
+ }
+
+ public Builder<T> assignerFactory(SplitAssignerFactory
assignerFactory) {
+ this.splitAssignerFactory = assignerFactory;
+ return this;
+ }
+
+ public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+ this.readerFunction = newReaderFunction;
+ return this;
+ }
+
+ public Builder<T> flinkConfig(ReadableConfig config) {
+ this.flinkConfig = config;
+ return this;
+ }
+
+ public Builder<T> caseSensitive(boolean newCaseSensitive) {
+ readOptions.put(FlinkReadOptions.CASE_SENSITIVE,
Boolean.toString(newCaseSensitive));
+ return this;
+ }
+
+ public Builder<T> useSnapshotId(Long newSnapshotId) {
+ if (newSnapshotId != null) {
+ readOptions.put(FlinkReadOptions.SNAPSHOT_ID.key(),
Long.toString(newSnapshotId));
+ }
+ return this;
+ }
+
+ public Builder<T> streamingStartingStrategy(StreamingStartingStrategy
newStartingStrategy) {
+ readOptions.put(FlinkReadOptions.STARTING_STRATEGY,
newStartingStrategy.name());
+ return this;
+ }
+
+ public Builder<T> startSnapshotTimestamp(Long
newStartSnapshotTimestamp) {
+ if (newStartSnapshotTimestamp != null) {
+ readOptions.put(
+ FlinkReadOptions.START_SNAPSHOT_TIMESTAMP.key(),
+ Long.toString(newStartSnapshotTimestamp));
+ }
+ return this;
+ }
+
+ public Builder<T> startSnapshotId(Long newStartSnapshotId) {
+ if (newStartSnapshotId != null) {
+ readOptions.put(
+ FlinkReadOptions.START_SNAPSHOT_ID.key(),
Long.toString(newStartSnapshotId));
+ }
+ return this;
+ }
+
+ public Builder<T> tag(String tag) {
+ readOptions.put(FlinkReadOptions.TAG.key(), tag);
+ return this;
+ }
+
+ public Builder<T> branch(String branch) {
+ readOptions.put(FlinkReadOptions.BRANCH.key(), branch);
+ return this;
+ }
+
+ public Builder<T> startTag(String startTag) {
+ readOptions.put(FlinkReadOptions.START_TAG.key(), startTag);
+ return this;
+ }
+
+ public Builder<T> endTag(String endTag) {
+ readOptions.put(FlinkReadOptions.END_TAG.key(), endTag);
+ return this;
+ }
+
+ public Builder<T> endSnapshotId(Long newEndSnapshotId) {
+ if (newEndSnapshotId != null) {
+ readOptions.put(FlinkReadOptions.END_SNAPSHOT_ID.key(),
Long.toString(newEndSnapshotId));
+ }
+ return this;
+ }
+
+ public Builder<T> asOfTimestamp(Long newAsOfTimestamp) {
+ if (newAsOfTimestamp != null) {
+ readOptions.put(FlinkReadOptions.AS_OF_TIMESTAMP.key(),
Long.toString(newAsOfTimestamp));
+ }
+ return this;
+ }
+
+ public Builder<T> splitSize(Long newSplitSize) {
+ if (newSplitSize != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_SIZE,
Long.toString(newSplitSize));
+ }
+ return this;
+ }
+
+ public Builder<T> splitLookback(Integer newSplitLookback) {
+ if (newSplitLookback != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_LOOKBACK,
Integer.toString(newSplitLookback));
+ }
+ return this;
+ }
+
+ public Builder<T> splitOpenFileCost(Long newSplitOpenFileCost) {
+ if (newSplitOpenFileCost != null) {
+ readOptions.put(FlinkReadOptions.SPLIT_FILE_OPEN_COST,
Long.toString(newSplitOpenFileCost));
+ }
+
+ return this;
+ }
+
+ public Builder<T> streaming(boolean streaming) {
+ readOptions.put(FlinkReadOptions.STREAMING,
Boolean.toString(streaming));
+ return this;
+ }
+
+ public Builder<T> monitorInterval(Duration newMonitorInterval) {
+ if (newMonitorInterval != null) {
+ readOptions.put(FlinkReadOptions.MONITOR_INTERVAL,
newMonitorInterval.toNanos() + " ns");
+ }
+ return this;
+ }
+
+ public Builder<T> nameMapping(String newNameMapping) {
+ readOptions.put(TableProperties.DEFAULT_NAME_MAPPING,
newNameMapping);
+ return this;
+ }
+
+ public Builder<T> project(Schema newProjectedSchema) {
+ this.contextBuilder.project(newProjectedSchema);
+ return this;
+ }
+
+ public Builder<T> project(TableSchema newProjectedFlinkSchema) {
+ this.projectedFlinkSchema = newProjectedFlinkSchema;
+ return this;
+ }
+
+ public Builder<T> filters(List<Expression> newFilters) {
+ this.contextBuilder.filters(newFilters);
+ return this;
+ }
+
+ public Builder<T> limit(Long newLimit) {
+ if (newLimit != null) {
+ readOptions.put(FlinkReadOptions.LIMIT,
Long.toString(newLimit));
+ }
+ return this;
+ }
+
+ public Builder<T> includeColumnStats(boolean newIncludeColumnStats) {
+ readOptions.put(
+ FlinkReadOptions.INCLUDE_COLUMN_STATS,
Boolean.toString(newIncludeColumnStats));
+ return this;
+ }
+
+ public Builder<T> planParallelism(int planParallelism) {
+ readOptions.put(
+
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.key(),
+ Integer.toString(planParallelism));
+ return this;
+ }
+
+ public Builder<T> exposeLocality(boolean newExposeLocality) {
+ this.exposeLocality = newExposeLocality;
+ return this;
+ }
+
+ public Builder<T> maxAllowedPlanningFailures(int
maxAllowedPlanningFailures) {
+ readOptions.put(
+
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.key(),
+ Integer.toString(maxAllowedPlanningFailures));
+ return this;
+ }
+
+ /**
+ * Set the read properties for Flink source. View the supported
properties in {@link
+ * FlinkReadOptions}
+ */
+ public Builder<T> set(String property, String value) {
+ readOptions.put(property, value);
+ return this;
+ }
+
+ /**
+ * Set the read properties for Flink source. View the supported
properties in {@link
+ * FlinkReadOptions}
+ */
+ public Builder<T> setAll(Map<String, String> properties) {
+ readOptions.putAll(properties);
+ return this;
+ }
+
+ /** @deprecated Use {@link #setAll} instead. */
+ @Deprecated
+ public Builder<T> properties(Map<String, String> properties) {
+ readOptions.putAll(properties);
+ return this;
+ }
+
+ public IcebergSource<T> build() {
+ if (table == null) {
+ try (TableLoader loader = tableLoader) {
+ loader.open();
+ this.table = tableLoader.loadTable();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ if (metadataConverters == null) {
+ metadataConverters = new MetadataConverter[0];
+ }
+
+ contextBuilder.resolveConfig(table, readOptions, flinkConfig);
+
+ Schema icebergSchema = table.schema();
+ if (projectedFlinkSchema != null) {
+ contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema,
projectedFlinkSchema));
+ }
+
+ ScanContext context = contextBuilder.build();
+ if (readerFunction == null) {
+ if (table instanceof BaseMetadataTable) {
+ MetaDataReaderFunction rowDataReaderFunction =
+ new MetaDataReaderFunction(
+ flinkConfig, table.schema(),
context.project(),
+ table.io(), table.encryption(),
metadataConverters);
+ this.readerFunction = (ReaderFunction<T>)
rowDataReaderFunction;
+ } else {
+ RowDataReaderFunction rowDataReaderFunction =
+ new RowDataReaderFunction(
+ flinkConfig,
+ table.schema(),
+ context.project(),
+ context.nameMapping(),
+ context.caseSensitive(),
+ table.io(),
+ table.encryption(),
+ context.filters(),
+ metadataConverters);
+ this.readerFunction = (ReaderFunction<T>)
rowDataReaderFunction;
+ }
+ }
+ resolveMetricOption();
+ checkRequired();
+ // Since builder already load the table, pass it to the source to
avoid double loading
+ return new IcebergSource<T>(
+ tableLoader, context, readerFunction,
splitAssignerFactory, table, metricOption);
+ }
+
+ private void checkRequired() {
+ Preconditions.checkNotNull(tableLoader, "tableLoader is
required.");
+ Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory
is required.");
+ Preconditions.checkNotNull(readerFunction, "readerFunction is
required.");
+ Preconditions.checkNotNull(inlongAuditAddress, "inlongAuditAddress
is required.");
+ Preconditions.checkNotNull(inlongAuditKeys, "inlongAuditKeys is
required.");
+ Preconditions.checkNotNull(inlongMetrics, "inlongMetrics is
required.");
+ Preconditions.checkNotNull(metricOption, "metricOption is
required.");
+ }
+
+ private void resolveMetricOption() {
+ if (metricOption != null) {
+ LOG.info("metric option is not null, no need to init it");
+ return;
+ }
+
+ if (StringUtils.isNullOrWhitespaceOnly(inlongAuditAddress)) {
+ inlongAuditAddress = readOptions.get(INLONG_AUDIT.key());
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(inlongAuditKeys)) {
+ inlongAuditKeys = readOptions.get(AUDIT_KEYS.key());
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(inlongMetrics)) {
+ inlongMetrics = readOptions.get(INLONG_METRIC.key());
+ }
+ LOG.info("start to init metric option with audit={}, keys={},
metric={}",
+ inlongAuditAddress, inlongAuditKeys, inlongMetrics);
+ this.metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetrics)
+ .withAuditAddress(inlongAuditAddress)
+ .withAuditKeys(inlongAuditKeys)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
new file mode 100644
index 0000000000..f55f63fadd
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/IcebergTableSource.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source;
+
+import org.apache.inlong.sort.iceberg.IcebergReadableMetadata;
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.FlinkFilters;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.source.FlinkSource;
+import org.apache.iceberg.flink.source.assigner.SplitAssignerType;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Flink Iceberg table source.
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class IcebergTableSource
+ implements
+ ScanTableSource,
+ SupportsProjectionPushDown,
+ SupportsFilterPushDown,
+ SupportsLimitPushDown,
+ SupportsReadingMetadata {
+
+ private int[] projectedFields;
+ private Long limit;
+ private List<Expression> filters;
+ protected DataType producedDataType;
+ protected List<String> metadataKeys;
+
+ private final TableLoader loader;
+ private final TableSchema schema;
+ private final Map<String, String> properties;
+ private final boolean isLimitPushDown;
+ private final ReadableConfig readableConfig;
+
+ private IcebergTableSource(IcebergTableSource toCopy) {
+ this.loader = toCopy.loader;
+ this.schema = toCopy.schema;
+ this.properties = toCopy.properties;
+ this.projectedFields = toCopy.projectedFields;
+ this.isLimitPushDown = toCopy.isLimitPushDown;
+ this.limit = toCopy.limit;
+ this.filters = toCopy.filters;
+ this.readableConfig = toCopy.readableConfig;
+ this.producedDataType = toCopy.producedDataType;
+ this.metadataKeys = toCopy.metadataKeys;
+ }
+
+ public IcebergTableSource(
+ TableLoader loader,
+ TableSchema schema,
+ Map<String, String> properties,
+ ReadableConfig readableConfig) {
+ this(loader, schema, properties, null, false, null,
ImmutableList.of(), readableConfig);
+ }
+
+ private IcebergTableSource(
+ TableLoader loader,
+ TableSchema schema,
+ Map<String, String> properties,
+ int[] projectedFields,
+ boolean isLimitPushDown,
+ Long limit,
+ List<Expression> filters,
+ ReadableConfig readableConfig) {
+ this.loader = loader;
+ this.schema = schema;
+ this.properties = properties;
+ this.projectedFields = projectedFields;
+ this.isLimitPushDown = isLimitPushDown;
+ this.limit = limit;
+ this.filters = filters;
+ this.readableConfig = readableConfig;
+ this.producedDataType = schema.toPhysicalRowDataType();
+ this.metadataKeys = new ArrayList<>();
+ }
+
+ @Override
+ public void applyProjection(int[][] projectFields) {
+ this.projectedFields = new int[projectFields.length];
+ for (int i = 0; i < projectFields.length; i++) {
+ Preconditions.checkArgument(
+ projectFields[i].length == 1, "Don't support nested
projection in iceberg source now.");
+ this.projectedFields[i] = projectFields[i][0];
+ }
+ }
+
+ private DataStreamSource<RowData> createFLIP27Stream(
+ StreamExecutionEnvironment env,
+ TypeInformation<RowData> typeInfo) {
+ SplitAssignerType assignerType =
+
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
+ MetadataConverter[] converters = getMetadataConverters();
+ IcebergSource<RowData> source =
+ IcebergSource.forRowData()
+ .tableLoader(loader)
+ .assignerFactory(assignerType.factory())
+ .properties(properties)
+ .project(getProjectedSchema())
+ .limit(limit)
+ .filters(filters)
+ .flinkConfig(readableConfig)
+ .metadataConverters(converters)
+ .build();
+ DataStreamSource<RowData> stream =
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ source.name(),
+ typeInfo);
+ return stream;
+ }
+
+ private TableSchema getProjectedSchema() {
+ if (projectedFields == null) {
+ return schema;
+ } else {
+ String[] fullNames = schema.getFieldNames();
+ DataType[] fullTypes = schema.getFieldDataTypes();
+ return TableSchema.builder()
+ .fields(
+ Arrays.stream(projectedFields).mapToObj(i ->
fullNames[i]).toArray(String[]::new),
+ Arrays.stream(projectedFields).mapToObj(i ->
fullTypes[i]).toArray(DataType[]::new))
+ .build();
+ }
+ }
+
+ @Override
+ public void applyLimit(long newLimit) {
+ this.limit = newLimit;
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+ List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+ List<Expression> expressions = Lists.newArrayList();
+
+ for (ResolvedExpression resolvedExpression : flinkFilters) {
+ Optional<Expression> icebergExpression =
FlinkFilters.convert(resolvedExpression);
+ if (icebergExpression.isPresent()) {
+ expressions.add(icebergExpression.get());
+ acceptedFilters.add(resolvedExpression);
+ }
+ }
+
+ this.filters = expressions;
+ return Result.of(acceptedFilters, flinkFilters);
+ }
+
+ @Override
+ public boolean supportsNestedProjection() {
+ // TODO: support nested projection
+ return false;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+
+ final TypeInformation<RowData> typeInfo =
+ runtimeProviderContext.createTypeInformation(producedDataType);
+ return new DataStreamScanProvider() {
+
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
+ return createFLIP27Stream(execEnv, typeInfo);
+ }
+
+ @Override
+ public boolean isBounded() {
+ return FlinkSource.isBounded(properties);
+ }
+ };
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new IcebergTableSource(this);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Iceberg table source";
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(IcebergReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ IcebergReadableMetadata::getKey,
IcebergReadableMetadata::getDataType));
+
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ private MetadataConverter[] getMetadataConverters() {
+ if (CollectionUtils.isEmpty(metadataKeys)) {
+ return new MetadataConverter[0];
+ }
+ return metadataKeys.stream()
+ .map(
+ key -> Stream.of(IcebergReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(IcebergReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
new file mode 100644
index 0000000000..9169b48aaf
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayBatchRecords.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * {@link RecordsWithSplitIds} is used to pass a batch of records from fetcher
to source reader.
+ * Batching is to improve the efficiency for records handover.
+ *
+ * <p>{@link RecordsWithSplitIds} interface can encapsulate batches from
multiple splits. This is
+ * the case for Kafka source where fetchers can retrieve records from multiple
Kafka partitions at
+ * the same time.
+ *
+ * <p>For file-based sources like Iceberg, readers always read one split/file
at a time. Hence, we
+ * will only have a batch of records for one split here.
+ *
+ * <p>This class uses array to store a batch of records from the same file
(with the same
+ * fileOffset).
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class ArrayBatchRecords<T> implements
RecordsWithSplitIds<RecordAndPosition<T>> {
+
+ @Nullable
+ private String splitId;
+ @Nullable
+ private final Pool.Recycler<T[]> recycler;
+ @Nullable
+ private final T[] records;
+ private final int numberOfRecords;
+ private final Set<String> finishedSplits;
+ private final RecordAndPosition<T> recordAndPosition;
+
+ // point to current read position within the records array
+ private int position;
+
+ private ArrayBatchRecords(
+ @Nullable String splitId,
+ @Nullable Pool.Recycler<T[]> recycler,
+ @Nullable T[] records,
+ int numberOfRecords,
+ int fileOffset,
+ long startingRecordOffset,
+ Set<String> finishedSplits) {
+ Preconditions.checkArgument(numberOfRecords >= 0, "numberOfRecords
can't be negative");
+ Preconditions.checkArgument(fileOffset >= 0, "fileOffset can't be
negative");
+ Preconditions.checkArgument(startingRecordOffset >= 0,
"numberOfRecords can't be negative");
+
+ this.splitId = splitId;
+ this.recycler = recycler;
+ this.records = records;
+ this.numberOfRecords = numberOfRecords;
+ this.finishedSplits =
+ Preconditions.checkNotNull(finishedSplits, "finishedSplits can
be empty but not null");
+ this.recordAndPosition = new RecordAndPosition<>();
+
+ recordAndPosition.set(null, fileOffset, startingRecordOffset);
+ this.position = 0;
+ }
+
+ @Nullable
+ @Override
+ public String nextSplit() {
+ String nextSplit = this.splitId;
+ // set the splitId to null to indicate no more splits
+ // this class only contains record for one split
+ this.splitId = null;
+ return nextSplit;
+ }
+
+ @Nullable
+ @Override
+ public RecordAndPosition<T> nextRecordFromSplit() {
+ if (position < numberOfRecords) {
+ recordAndPosition.record(records[position]);
+ position++;
+ return recordAndPosition;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This method is called when all records from this batch has been
emitted. If recycler is set, it
+ * should be called to return the records array back to pool.
+ */
+ @Override
+ public void recycle() {
+ if (recycler != null) {
+ recycler.recycle(records);
+ }
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+
+ @VisibleForTesting
+ T[] records() {
+ return records;
+ }
+
+ @VisibleForTesting
+ int numberOfRecords() {
+ return numberOfRecords;
+ }
+
+ /**
+ * Create a ArrayBatchRecords backed up an array with records from the
same file
+ *
+ * @param splitId Iceberg source only read from one split a time. We never
have multiple records
+ * from multiple splits.
+ * @param recycler Because {@link DataIterator} with {@link RowData}
returns an iterator of reused
+ * RowData object, we need to clone RowData eagerly when constructing
a batch of records. We
+ * can use object pool to reuse the RowData array object which can be
expensive to create.
+ * This recycler can be provided to recycle the array object back to
pool after read is
+ * exhausted. If the {@link DataIterator} returns an iterator of
non-reused objects, we don't
+ * need to clone objects. It is cheap to just create the batch array.
Hence, we don't need
+ * object pool and recycler can be set to null.
+ * @param records an array (maybe reused) holding a batch of records
+ * @param numberOfRecords actual number of records in the array
+ * @param fileOffset fileOffset for all records in this batch
+ * @param startingRecordOffset starting recordOffset
+ * @param <T> record type
+ */
+ public static <T> ArrayBatchRecords<T> forRecords(
+ String splitId,
+ Pool.Recycler<T[]> recycler,
+ T[] records,
+ int numberOfRecords,
+ int fileOffset,
+ long startingRecordOffset) {
+ return new ArrayBatchRecords<>(
+ splitId,
+ recycler,
+ records,
+ numberOfRecords,
+ fileOffset,
+ startingRecordOffset,
+ Collections.emptySet());
+ }
+
+ /**
+ * Create ab ArrayBatchRecords with only finished split id
+ *
+ * @param splitId for the split that is just exhausted
+ */
+ public static <T> ArrayBatchRecords<T> finishedSplit(String splitId) {
+ return new ArrayBatchRecords<>(null, null, null, 0, 0, 0,
Collections.singleton(splitId));
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
new file mode 100644
index 0000000000..2ef897b5d4
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/ArrayPoolDataIteratorBatcher.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
+import org.apache.flink.connector.file.src.util.Pool;
+import org.apache.iceberg.flink.FlinkConfigOptions;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.DataIteratorBatcher;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * This implementation stores record batch in array from recyclable pool
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class ArrayPoolDataIteratorBatcher<T> implements DataIteratorBatcher<T> {
+
+ private final int batchSize;
+ private final int handoverQueueSize;
+ private final RecordFactory<T> recordFactory;
+
+ private transient Pool<T[]> pool;
+
+ ArrayPoolDataIteratorBatcher(ReadableConfig config, RecordFactory<T>
recordFactory) {
+ this.batchSize =
config.get(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT);
+ this.handoverQueueSize =
config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY);
+ this.recordFactory = recordFactory;
+ }
+
+ @Override
+ public CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> batch(
+ String splitId, DataIterator<T> inputIterator) {
+ Preconditions.checkArgument(inputIterator != null, "Input data
iterator can't be null");
+ // lazily create pool as it is not serializable
+ if (pool == null) {
+ this.pool = createPoolOfBatches(handoverQueueSize);
+ }
+ return new ArrayPoolBatchIterator(splitId, inputIterator, pool);
+ }
+
+ private Pool<T[]> createPoolOfBatches(int numBatches) {
+ Pool<T[]> poolOfBatches = new Pool<>(numBatches);
+ for (int batchId = 0; batchId < numBatches; batchId++) {
+ T[] batch = recordFactory.createBatch(batchSize);
+ poolOfBatches.add(batch);
+ }
+
+ return poolOfBatches;
+ }
+
+ private class ArrayPoolBatchIterator
+ implements
+ CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> {
+
+ private final String splitId;
+ private final DataIterator<T> inputIterator;
+ private final Pool<T[]> pool;
+
+ ArrayPoolBatchIterator(String splitId, DataIterator<T> inputIterator,
Pool<T[]> pool) {
+ this.splitId = splitId;
+ this.inputIterator = inputIterator;
+ this.pool = pool;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inputIterator.hasNext();
+ }
+
+ @Override
+ public RecordsWithSplitIds<RecordAndPosition<T>> next() {
+ if (!inputIterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ T[] batch = getCachedEntry();
+ int recordCount = 0;
+ while (inputIterator.hasNext() && recordCount < batchSize) {
+ // The record produced by inputIterator can be reused like for
the RowData case.
+ // inputIterator.next() can't be called again until the copy
is made
+ // since the record is not consumed immediately.
+ T nextRecord = inputIterator.next();
+ recordFactory.clone(nextRecord, batch, recordCount);
+ recordCount++;
+ if (!inputIterator.currentFileHasNext()) {
+ // break early so that records in the ArrayResultIterator
+ // have the same fileOffset.
+ break;
+ }
+ }
+
+ return ArrayBatchRecords.forRecords(
+ splitId,
+ pool.recycler(),
+ batch,
+ recordCount,
+ inputIterator.fileOffset(),
+ inputIterator.recordOffset() - recordCount);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputIterator.close();
+ }
+
+ private T[] getCachedEntry() {
+ try {
+ return pool.pollEntry();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for
array pool entry", e);
+ }
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
new file mode 100644
index 0000000000..beaf321e51
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.source.split.SplitRequestEvent;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class IcebergSourceReader<T>
+ extends
+ SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T,
IcebergSourceSplit, IcebergSourceSplit> {
+
+ public IcebergSourceReader(
+ InlongIcebergSourceReaderMetrics metrics,
+ ReaderFunction<T> readerFunction,
+ SourceReaderContext context) {
+ super(
+ () -> new IcebergSourceSplitReader<>(metrics, readerFunction,
context),
+ new IcebergSourceRecordEmitter<>(),
+ context.getConfiguration(),
+ context);
+ }
+
+ @Override
+ public void start() {
+ // We request a split only if we did not get splits during the
checkpoint restore.
+ // Otherwise, reader restarts will keep requesting more and more
splits.
+ if (getNumberOfCurrentlyAssignedSplits() == 0) {
+ requestSplit(Collections.emptyList());
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, IcebergSourceSplit>
finishedSplitIds) {
+ requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
+ }
+
+ @Override
+ protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
+ return split;
+ }
+
+ @Override
+ protected IcebergSourceSplit toSplitType(String splitId,
IcebergSourceSplit splitState) {
+ return splitState;
+ }
+
+ private void requestSplit(Collection<String> finishedSplitIds) {
+ context.sendSourceEventToCoordinator(new
SplitRequestEvent(finishedSplitIds));
+ }
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
similarity index 50%
copy from
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
copy to
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
index 0826eb2a9e..0a928069e0 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/CalculateObjectSizeUtils.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceRecordEmitter.java
@@ -15,33 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.base.util;
+package org.apache.inlong.sort.iceberg.source.reader;
-import org.apache.flink.table.data.binary.BinaryRowData;
-
-import java.nio.charset.StandardCharsets;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
/**
- * calculate tool for object
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
*/
-public class CalculateObjectSizeUtils {
+final class IcebergSourceRecordEmitter<T>
+ implements
+ RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> {
- /**
- * {@link BinaryRowData} don't implement the {@link Object#toString} method
- * So, we need use {@link BinaryRowData#getSizeInBytes} to get byte size.
- */
- public static long getDataSize(Object object) {
- if (object == null) {
- return 0L;
- }
- long size;
- if (object instanceof BinaryRowData) {
- BinaryRowData binaryRowData = (BinaryRowData) object;
- size = binaryRowData.getSizeInBytes();
- } else {
- size = object.toString().getBytes(StandardCharsets.UTF_8).length;
- }
- return size;
+ IcebergSourceRecordEmitter() {
}
+ @Override
+ public void emitRecord(
+ RecordAndPosition<T> element, SourceOutput<T> output,
IcebergSourceSplit split) {
+ output.collect(element.record());
+ split.updatePosition(element.fileOffset(), element.recordOffset());
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
new file mode 100644
index 0000000000..146c2dad7e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceSplitReader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RecordAndPosition;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Queue;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>,
IcebergSourceSplit> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSourceSplitReader.class);
+
+ private final InlongIcebergSourceReaderMetrics metrics;
+ private final ReaderFunction<T> openSplitFunction;
+ private final int indexOfSubtask;
+ private final Queue<IcebergSourceSplit> splits;
+
+ private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>>
currentReader;
+ private IcebergSourceSplit currentSplit;
+ private String currentSplitId;
+
+ IcebergSourceSplitReader(
+ InlongIcebergSourceReaderMetrics metrics,
+ ReaderFunction<T> openSplitFunction,
+ SourceReaderContext context) {
+ this.metrics = metrics;
+ this.openSplitFunction = openSplitFunction;
+ this.indexOfSubtask = context.getIndexOfSubtask();
+ this.splits = new ArrayDeque<>();
+ }
+
+ @Override
+ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws
IOException {
+ metrics.incrementSplitReaderFetchCalls(1);
+ if (currentReader == null) {
+ IcebergSourceSplit nextSplit = splits.poll();
+ if (nextSplit != null) {
+ currentSplit = nextSplit;
+ currentSplitId = nextSplit.splitId();
+ currentReader = openSplitFunction.apply(currentSplit);
+ } else {
+ // return an empty result, which will lead to split fetch to
be idle.
+ // SplitFetcherManager will then close idle fetcher.
+ return new RecordsBySplits(Collections.emptyMap(),
Collections.emptySet());
+ }
+ }
+
+ if (currentReader.hasNext()) {
+ // Because Iterator#next() doesn't support checked exception,
+ // we need to wrap and unwrap the checked IOException with
UncheckedIOException
+ try {
+ ArrayBatchRecords<T> result = (ArrayBatchRecords<T>)
currentReader.next();
+ metrics.outputMetricsWithEstimate(result);
+ return result;
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ } else {
+ return finishSplit();
+ }
+ }
+
+ @Override
+ public void handleSplitsChanges(SplitsChange<IcebergSourceSplit>
splitsChange) {
+ if (!(splitsChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported split change: %s",
splitsChange.getClass()));
+ }
+
+ LOG.info("Add {} splits to reader", splitsChange.splits().size());
+ splits.addAll(splitsChange.splits());
+ metrics.incrementAssignedSplits(splitsChange.splits().size());
+ metrics.incrementAssignedBytes(calculateBytes(splitsChange));
+ }
+
+ @Override
+ public void wakeUp() {
+ }
+
+ @Override
+ public void close() throws Exception {
+ currentSplitId = null;
+ if (currentReader != null) {
+ currentReader.close();
+ }
+ }
+
+ private long calculateBytes(IcebergSourceSplit split) {
+ return
split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum);
+ }
+
+ private long calculateBytes(SplitsChange<IcebergSourceSplit>
splitsChanges) {
+ return
splitsChanges.splits().stream().map(this::calculateBytes).reduce(0L, Long::sum);
+ }
+
+ private ArrayBatchRecords<T> finishSplit() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+
+ ArrayBatchRecords<T> finishRecords =
ArrayBatchRecords.finishedSplit(currentSplitId);
+ LOG.info("Split reader {} finished split: {}", indexOfSubtask,
currentSplitId);
+ metrics.incrementFinishedSplits(1);
+ metrics.incrementFinishedBytes(calculateBytes(currentSplit));
+ currentSplitId = null;
+ return finishRecords;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
new file mode 100644
index 0000000000..e7afec2551
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
+
+/**
+ * Inlong iceberg source reader metrics
+ */
+@Slf4j
+public class InlongIcebergSourceReaderMetrics extends
IcebergSourceReaderMetrics {
+
+ private final MetricGroup metrics;
+ private SourceMetricData sourceMetricData;
+
+ public InlongIcebergSourceReaderMetrics(MetricGroup metrics, String
fullTableName) {
+ super(metrics, fullTableName);
+ this.metrics = metrics;
+ }
+
+ public void registerMetrics(MetricOption metricOption) {
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption, metrics);
+ } else {
+ log.warn("failed to init sourceMetricData since the metricOption
is null");
+ }
+ }
+
+ public void outputMetricsWithEstimate(ArrayBatchRecords batchRecord) {
+ sourceMetricData.outputMetricsWithEstimate(batchRecord.records());
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
new file mode 100644
index 0000000000..d7a5df351f
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/MetaDataReaderFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.DataTaskReader;
+import org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Reading metadata tables (like snapshots, manifests, etc.)
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+@Internal
+public class MetaDataReaderFunction extends
DataIteratorReaderFunction<RowData> {
+
+ private final Schema readSchema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+
+ public MetaDataReaderFunction(
+ ReadableConfig config,
+ Schema tableSchema,
+ Schema projectedSchema,
+ FileIO io,
+ EncryptionManager encryption,
+ MetadataConverter[] metadataConverters) {
+ super(
+ new ArrayPoolDataIteratorBatcher<>(
+ config,
+ new RowDataRecordFactory(
+
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)),
+ metadataConverters)));
+ this.readSchema = readSchema(tableSchema, projectedSchema);
+ this.io = io;
+ this.encryption = encryption;
+ }
+
+ @Override
+ public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
+ return new DataIterator<>(new DataTaskReader(readSchema),
split.task(), io, encryption);
+ }
+
+ private static Schema readSchema(Schema tableSchema, Schema
projectedSchema) {
+ Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
+ return projectedSchema == null ? tableSchema : projectedSchema;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
new file mode 100644
index 0000000000..1afa412042
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RecordFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import java.io.Serializable;
+
+/**
+ * In FLIP-27 source, SplitReader#fetch() returns a batch of records. Since
DataIterator for RowData
+ * returns an iterator of reused RowData objects, RecordFactory is needed to
(1) create object array
+ * that is recyclable via pool. (2) clone RowData element from DataIterator to
the batch array.
+ *
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+interface RecordFactory<T> extends Serializable {
+
+ /** Create a batch of records */
+ T[] createBatch(int batchSize);
+
+ /** Clone record into the specified position of the batch array */
+ void clone(T from, T[] batch, int position);
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
new file mode 100644
index 0000000000..534cb6f7ac
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataReaderFunction.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
+import org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData>
{
+
+ private final Schema tableSchema;
+ private final Schema readSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final List<Expression> filters;
+
+ public RowDataReaderFunction(
+ ReadableConfig config,
+ Schema tableSchema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ FileIO io,
+ EncryptionManager encryption,
+ List<Expression> filters,
+ MetadataConverter[] metadataConverters) {
+ super(
+ new ArrayPoolDataIteratorBatcher<>(
+ config,
+ new RowDataRecordFactory(
+
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)),
+ metadataConverters)));
+ this.tableSchema = tableSchema;
+ this.readSchema = readSchema(tableSchema, projectedSchema);
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
+ this.io = io;
+ this.encryption = encryption;
+ this.filters = filters;
+ }
+
+ @Override
+ public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
+ return new DataIterator<>(
+ new RowDataFileScanTaskReader(tableSchema, readSchema,
nameMapping, caseSensitive, filters),
+ split.task(),
+ io,
+ encryption);
+ }
+
+ private static Schema readSchema(Schema tableSchema, Schema
projectedSchema) {
+ Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
+ return projectedSchema == null ? tableSchema : projectedSchema;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
new file mode 100644
index 0000000000..80110cf014
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/RowDataRecordFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.reader;
+
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+import org.apache.inlong.sort.iceberg.source.utils.RecyclableJoinedRowData;
+import org.apache.inlong.sort.iceberg.source.utils.RowDataCloneUtil;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Copy from iceberg-flink:iceberg-flink-1.15:1.3.1
+ */
+class RowDataRecordFactory implements RecordFactory<RowData> {
+
+ private final RowType rowType;
+ private final TypeSerializer[] fieldSerializers;
+ private final MetadataConverter[] metadataConverters;
+
+ RowDataRecordFactory(RowType rowType, MetadataConverter[]
metadataConverters) {
+ this.rowType = rowType;
+ this.fieldSerializers = createFieldSerializers(rowType);
+ this.metadataConverters = metadataConverters;
+ }
+
+ static TypeSerializer[] createFieldSerializers(RowType rowType) {
+ return rowType.getChildren().stream()
+ .map(InternalSerializers::create)
+ .toArray(TypeSerializer[]::new);
+ }
+
+ @Override
+ public RowData[] createBatch(int batchSize) {
+ RowData[] arr = new RowData[batchSize];
+ for (int i = 0; i < batchSize; ++i) {
+ arr[i] = new RecyclableJoinedRowData(rowType.getFieldCount(),
metadataConverters.length);
+ }
+ return arr;
+ }
+
+ @Override
+ public void clone(RowData from, RowData[] batch, int position) {
+
+ RecyclableJoinedRowData recyclable;
+ if (batch[position] instanceof RecyclableJoinedRowData) {
+ recyclable = (RecyclableJoinedRowData) batch[position];
+ } else {
+ recyclable = new RecyclableJoinedRowData(rowType.getFieldCount(),
metadataConverters.length);
+ }
+
+ RowData physical =
+ RowDataCloneUtil.clonePhysical(from,
recyclable.getPhysicalRowData(), rowType, fieldSerializers);
+ RowData meta = RowDataCloneUtil.cloneMeta(from,
recyclable.getMetaRowData(), metadataConverters);
+ batch[position] = recyclable.replace(physical.getRowKind(), physical,
meta);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
new file mode 100644
index 0000000000..28327d2d8d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RecyclableJoinedRowData.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Objects;
+
+/**
+ * JoinedRowData that support recycle two member RowData by provide get() and
set() interfaces.
+ */
+@PublicEvolving
+public class RecyclableJoinedRowData implements RowData {
+
+ private RowKind rowKind = RowKind.INSERT;
+ private RowData physicalRowData;
+ private RowData metaRowData;
+
+ public RecyclableJoinedRowData() {
+ }
+
+ public RecyclableJoinedRowData(int physicalSize, int metaSize) {
+ physicalRowData = new GenericRowData(physicalSize);
+ metaRowData = new GenericRowData(metaSize);
+ }
+
+ public RecyclableJoinedRowData replace(RowKind rowKind, RowData
physicalRowData, RowData metaRowData) {
+ this.rowKind = rowKind;
+ this.physicalRowData = physicalRowData;
+ this.metaRowData = metaRowData;
+ return this;
+ }
+
+ public RowData getPhysicalRowData() {
+ return physicalRowData;
+ }
+
+ public RowData getMetaRowData() {
+ return metaRowData;
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ @Override
+ public int getArity() {
+ return physicalRowData.getArity() + metaRowData.getArity();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return rowKind;
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ this.rowKind = kind;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.isNullAt(pos);
+ } else {
+ return metaRowData.isNullAt(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getBoolean(pos);
+ } else {
+ return metaRowData.getBoolean(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getByte(pos);
+ } else {
+ return metaRowData.getByte(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public short getShort(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getShort(pos);
+ } else {
+ return metaRowData.getShort(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public int getInt(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getInt(pos);
+ } else {
+ return metaRowData.getInt(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public long getLong(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getLong(pos);
+ } else {
+ return metaRowData.getLong(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getFloat(pos);
+ } else {
+ return metaRowData.getFloat(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getDouble(pos);
+ } else {
+ return metaRowData.getDouble(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getString(pos);
+ } else {
+ return metaRowData.getString(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getDecimal(pos, precision, scale);
+ } else {
+ return metaRowData.getDecimal(pos - physicalRowData.getArity(),
precision, scale);
+ }
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getTimestamp(pos, precision);
+ } else {
+ return metaRowData.getTimestamp(pos - physicalRowData.getArity(),
precision);
+ }
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getRawValue(pos);
+ } else {
+ return metaRowData.getRawValue(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getBinary(pos);
+ } else {
+ return metaRowData.getBinary(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getArray(pos);
+ } else {
+ return metaRowData.getArray(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getMap(pos);
+ } else {
+ return metaRowData.getMap(pos - physicalRowData.getArity());
+ }
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ if (pos < physicalRowData.getArity()) {
+ return physicalRowData.getRow(pos, numFields);
+ } else {
+ return metaRowData.getRow(pos - physicalRowData.getArity(),
numFields);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RecyclableJoinedRowData that = (RecyclableJoinedRowData) o;
+ return Objects.equals(rowKind, that.rowKind)
+ && Objects.equals(this.physicalRowData, that.physicalRowData)
+ && Objects.equals(this.metaRowData, that.metaRowData);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowKind, physicalRowData, metaRowData);
+ }
+
+ @Override
+ public String toString() {
+ return rowKind.shortString() + "{" + "physicalRowData=" +
physicalRowData + ", metaRowData=" + metaRowData
+ + '}';
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
new file mode 100644
index 0000000000..58e541f0f1
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/utils/RowDataCloneUtil.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.iceberg.source.utils;
+
+import
org.apache.inlong.sort.iceberg.IcebergReadableMetadata.MetadataConverter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Support clone meta and physical RowData.
+ */
+public class RowDataCloneUtil {
+
+ private RowDataCloneUtil() {
+ }
+
+ public static RowData cloneMeta(
+ RowData from, RowData reuse, MetadataConverter[] converters) {
+ GenericRowData ret;
+ if (reuse instanceof GenericRowData) {
+ ret = (GenericRowData) reuse;
+ } else {
+ ret = new GenericRowData(from.getArity());
+ }
+
+ for (int i = 0; i < converters.length; i++) {
+ Object meta = converters[i].read(from);
+ ret.setField(i, meta);
+ }
+
+ return ret;
+ }
+
+ public static RowData clonePhysical(
+ RowData from, RowData reuse, RowType rowType, TypeSerializer[]
fieldSerializers) {
+ GenericRowData ret;
+ if (reuse instanceof GenericRowData) {
+ ret = (GenericRowData) reuse;
+ } else {
+ ret = new GenericRowData(from.getArity());
+ }
+ ret.setRowKind(from.getRowKind());
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (!from.isNullAt(i)) {
+ RowData.FieldGetter getter =
RowData.createFieldGetter(rowType.getTypeAt(i), i);
+ ret.setField(i,
fieldSerializers[i].copy(getter.getFieldOrNull(from)));
+ } else {
+ ret.setField(i, null);
+ }
+ }
+ return ret;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
index ddbf4ebba4..2c5f0dd56f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-flink-dependencies/pom.xml
@@ -69,6 +69,11 @@
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index d9f7cf28f3..e83548b61e 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -768,6 +768,16 @@
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkCatalogFactory.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkEnvironmentContext.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayBatchRecords.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/ArrayPoolDataIteratorBatcher.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceReader.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceRecordEmitter.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/IcebergSourceSplitReader.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/InlongIcebergSourceReaderMetrics.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/MetaDataReaderFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RecordFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataReaderFunction.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/reader/RowDataRecordFactory.java
Source : iceberg-flink:iceberg-flink-1.15:1.3.1 (Please note that the
software have been modified.)
License : https://github.com/apache/iceberg/LICENSE
diff --git a/pom.xml b/pom.xml
index 576f0c73e6..d30034e7a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -205,6 +205,7 @@
<otel.version>1.28.0</otel.version>
<tencentcloud-api.version>3.1.830</tencentcloud-api.version>
<woodstox-core.version>5.4.0</woodstox-core.version>
+ <libfb303.version>0.9.3</libfb303.version>
</properties>
<dependencyManagement>